import zlib
from typing import List
from sortedcontainers import SortedDict, SortedList
[docs]
class AssignmentCompressor:
"""
A class for compressing and decompressing lots of assignments very, very
quickly. Intended for use with ``jsonlines``-like libraries (where
assignments are read in line-by-line) or for network requests (where
assignments are retrieved one-by-one). When decompressing, yields ``dict`` s
where keys are in sorted order.
The compression schema considers the set of unique identifiers, imposes an
ordering (lexicographic order) on the identifiers, and matches the
assignment to that ordering. We assign all unassigned units to ``"-1"``
and, once the default cache size is hit (or assignments are no longer being
read in), compress all assignments in the cache. Assignments are read in
and out in the same order, and the keys for each assignment are in the
same order.
Example:
To compress assignments, we need a set of unique identifiers such that
each identifier maps one geometric unit to one district.
...
geoids = blocks["GEOID20"].astype(str)
ac = AssignmentCompressor(geoids, location="compressed-assignments.ac")
with ac as compressor:
for assignment in assignments:
# Here, ensure that all assignments have string keys and
# string values; also ensure that an assignment's keys are
# a subset of geoids (or whatever IDs you're passing).
compressor.compress(assignment)
...
To decompress assignments, we again must have a set of unique geometric
identifiers which match the assignments. We can then iterate over the
decompressed assignments as they're read out of the file.
...
geoids = blocks["GEOID20"].astype(str)
ac = AssignmentCompressor(geoids, location="compressed-assignments.ac")
for assignment in ac.decompress():
<do whatever!>
...
Attributes:
DISTRICT_DELIMITER (bytes): A bytestring which separates district
identifiers in an assignment.
ASSIGNMENT_DELIMITER (bytes): A bytestring which separates assignments from
each other.
CHUNK_DELIMITER (bytes): A bytestring which separates assignment chunks
from each other.
CHUNK_SIZE (int): Default number of bytes read in from the IO stream at each
step.
ENCODING (str): Default string encoding style.
identifiers: A sortable, iterable collection of unique items corresponding
to geographic identifiers.
compressed: A pandas `Index` containing the identifiers; this is used
to quickly perform vectorized identifier matchings, rather than using
traditional iterative methods.
cache: Collection of assignments to be compressed. Assignments are loaded
into the cache every time the ``.compress()`` method is called, and
is cleared whenever the length of the cache exceeds the window width.
window: Maximum cache length before the cache is compressed, written to
file, and emptied.
default: The default assignment which is updated each time an assignment
is passed to the compressor.
location: The place to which compressed data is written or read.
"""
def __init__(self, identifiers, window=10, location="compressed.ac"):
"""
Creates `AssignmentCompressor` instance.
Args:
identifiers (list): An iterable collection of string identifiers; any
assignment's keys must be a subset of `identifiers`.
window (int, optional): A positive integer representing the cache
window size. Defaults to cache.
location (str, optional): The path to the compressed resource (read
or write). Defaults to `compressed.ac`.
"""
self.identifiers = SortedList(identifiers)
self.default = frozenset(zip(self.identifiers, ["-1"] * len(self.identifiers)))
self.cache = []
self.location = location
# Error to users if the window is nonexistent.
if not isinstance(window, int) or window <= 0:
raise ValueError("Cache window width must be a positive integer.")
self.window = window
self.DISTRICT_DELIMITER = b","
self.ASSIGNMENT_DELIMITER = b"<<<*>>>"
self.CHUNK_DELIMITER = b"(((*)))"
self.CHUNK_SIZE = 16384
self.ENCODING = "raw_unicode_escape"
[docs]
def match(self, assignment) -> SortedDict:
"""
Matches an assignment to an index (the set of geometric identifiers)
and returns a `SortedDict`.
Args:
assignment (dict): Dictionary which matches geometric identifiers to
districts. All keys and values in this dictionary must be strings.
Returns:
A `SortedDict` with identifiers matched ti district assignments.s
"""
# Create a dictionary which maps identifiers to `-1`, and update our
# dictionary with assignment values.
indexer = SortedDict(self.default)
indexer.update(assignment)
return indexer
def __enter__(self):
"""
A simple context-management method. Allows the user to use `with`
statements when compressing stuff so we don't have to worry about the
user specifying the last item they'll be compressing.
"""
return self
def __exit__(self, exc_type, exc_value, exc_tb):
"""
Teardown. Once we've exited the `with` statement (i.e. the user's all done
feeding items to the compressor) we can force the remaining items to be
compressed and written to file.
"""
if self.cache:
self._compress(force=True)
[docs]
def compress_all(self, assignments):
"""
Compresses all assignments in `assignments`.
Args:
assignments (list): List of dictionaries which match geometric identifiers
to districts. All keys and values in these dictionaries must be
strings.
"""
self.window = len(assignments)
with self as ac:
for assignment in assignments:
ac.compress(assignment)
[docs]
def compress(self, assignment):
"""
Compresses the assignment `assignment` using `zlib`.
Args:
assignment (dict): Dictionary which matches geometric identifiers to districts.
All keys and values in this dictionary must be strings.
"""
# If the user provides an empty assignment or the assignment's keys aren't
# a subset of `identifiers`, warn the user and skip the assignment.
skip = False
if not assignment:
skip = True
print("`assignment` is empty; skipping.")
if not set(assignment.keys()).issubset(self.identifiers):
skip = True
print(
"`assignment`'s keys are not a subset of `identifiers`; skipping. "
+ "Please ensure that all keys and values in `assignment` are strings.",
)
# Join the things on the district separator, encode the whole thing, and
# encode according to the default encoding.
if not skip:
indexed = self.match(assignment)
sep = self.DISTRICT_DELIMITER.decode()
encoded = bytes(sep.join(indexed.values()).encode(self.ENCODING))
# Compress.
self.cache += [encoded]
self._compress()
def _compress(self, force=False):
"""
Private method which actually does the compression.
Args:
force: If truthy, then the length of the cache is ignored, the data
in the cache are compressed, and the compressed data is written
to file. ``force`` is truthy when teardown logic is entered
(i.e. after ``__exit()__`` is called).
"""
# Check to see if the cache is full. If so, compress the data, write to
# file, and reset the cache.
if len(self.cache) == self.window or force:
with open(self.location, "ab") as writer:
# Write compressed data to file.
compressed = zlib.compress(
bytes(self.ASSIGNMENT_DELIMITER.join(self.cache))
)
writer.write(compressed)
# We only forcibly write the chunk separator to file if we've
# entered teardown logic and the cache *is not empty*. If the
# cache is empty when we're entering teardown logic, that means
# (number of assignments compressed) == (window width), in which
# case we've reached the end of compression and should not write
# a separator; doing so will produce an empty bytestring (which,
# in turn, produces a dictionary with one key, corresponding to
# a null assignment).
if not force:
writer.write(self.CHUNK_DELIMITER)
# Reset the cache.
self.cache = []
[docs]
def decompress(self):
"""
Decompresses the data at ``location``. A generator which ``yield`` s
assignments.
Yields:
Decompressed assignment dictionaries.
"""
# Open the compressed file. Then we read it in chunks, loading until
# we hit our separator or until the end of the file.
with open(self.location, "rb") as _compressed_fin:
for chunk in self._chunk(_compressed_fin):
if not chunk:
break
for assignment in self._decompress(chunk):
yield assignment
def _chunk(self, stream):
"""
Private method for reading chunks from file without holding the entire
file in memory. A generator for decompressed assignments.
Args:
stream: A ``BytesIO`` instance from which data is read.
Yields:
Buffered, compressed bytes to be fed into the decompressor.
"""
# Create a buffer.
_buffer = []
# Read until we hit the end of the file `yield`ing each chunk as we go.
while True:
# Read in a chunk of data.
chunk = stream.read(self.CHUNK_SIZE)
_buffer.append(chunk)
# Check if the chunk has our delimiter in it. If it contains our
# delimiter, then the buffer *up to the delimiter* contains compressed
# assignments; this should be `yield`ed and decompressed. We only
# want to get the part before the delimiter for decompression, but
# retain the rest.
if self.CHUNK_DELIMITER in chunk:
_buffered_bytes = b"".join(_buffer)
part, _buffer_first = _buffered_bytes.split(self.CHUNK_DELIMITER, 1)
_buffer = [_buffer_first]
yield part
# If the chunk's empty, `yield` the remaining buffer and return.
if not chunk:
yield b"".join(_buffer)
break
def _decompress(self, chunk) -> List[dict]:
"""
Private method which decompresses assignments.
Args:
chunk: Compressed, byte-encoded data representing ``window``
assignments.
Returns:
List of decompressed assignment objects.
"""
# Decompress the chunk and split it on our delimiter.
decompressed = zlib.decompress(chunk)
decompressed_parts = decompressed.split(self.ASSIGNMENT_DELIMITER)
# For each of the parts, decode the bytes, make them into lists, and
# match them to GEOIDs.
decoded_parts = [part.decode() for part in decompressed_parts]
split_parts = [
part.split(self.DISTRICT_DELIMITER.decode()) for part in decoded_parts
]
indexed_parts = [dict(zip(self.identifiers, part)) for part in split_parts]
return indexed_parts