From be69b3921722404a9d0090777d07ade7b6531399 Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 17 Mar 2024 00:53:11 +0000 Subject: [PATCH] 0.6.20 --- mabel/data/writers/batch_writer.py | 7 +- mabel/data/writers/internals/blob_writer.py | 50 +----- .../writers/internals/blob_writer_orso.py | 167 ------------------ .../disk/test_writer_disk_writer.py | 1 - tests/test_writer_batch_writer.py | 1 + 5 files changed, 11 insertions(+), 215 deletions(-) delete mode 100644 mabel/data/writers/internals/blob_writer_orso.py diff --git a/mabel/data/writers/batch_writer.py b/mabel/data/writers/batch_writer.py index 51c1ad6..24954a4 100644 --- a/mabel/data/writers/batch_writer.py +++ b/mabel/data/writers/batch_writer.py @@ -132,13 +132,14 @@ def finalize(self, **kwargs): completion_path = self.blob_writer.inner_writer.filename completion_path = os.path.split(completion_path)[0] + "/frame.complete" - manifest_path = os.path.split(completion_path)[0] + "/manifest.json" self.metadata["records"] = self.records + self.metadata["format"] = self.blob_writer.format if self.schema: if isinstance(self.schema, dict): self.metadata["schema"] = self.schema - elif hasattr(self.schema, "definition"): - self.metadata["schema"] = self.schema.definition + elif hasattr(self.schema, "to_dict"): + self.metadata["schema"] = self.schema.to_dict() + self.metadata["manifest"] = self.blob_writer.manifest flag = self.blob_writer.inner_writer.commit( byte_data=orjson.dumps(self.metadata, option=orjson.OPT_INDENT_2), override_blob_name=completion_path, diff --git a/mabel/data/writers/internals/blob_writer.py b/mabel/data/writers/internals/blob_writer.py index b49747a..e607df5 100644 --- a/mabel/data/writers/internals/blob_writer.py +++ b/mabel/data/writers/internals/blob_writer.py @@ -1,7 +1,5 @@ -import datetime import io import json -import sys import threading from typing import Optional @@ -20,49 +18,12 @@ SUPPORTED_FORMATS_ALGORITHMS = ("jsonl", "zstd", "parquet", "text", "flat") -def get_size(obj, seen=None): - """ - Recursively approximate the size of objects. - We don't know the actual size until we save, so we approximate the size based - on some rules - this will be wrong due to RLE, headers, precision and other - factors. - """ - size = sys.getsizeof(obj) - - if seen is None: - seen = set() - obj_id = id(obj) - if obj_id in seen: - return 0 - - if isinstance(obj, (int, float)): - size = 6 # probably 4 bytes, could be 8 - if isinstance(obj, bool): - size = 1 - if isinstance(obj, (str, bytes, bytearray)): - size = len(obj) + 4 - if obj is None: - size = 1 - if isinstance(obj, datetime.datetime): - size = 8 - - # Important mark as seen *before* entering recursion to gracefully handle - # self-referential objects - seen.add(obj_id) - if isinstance(obj, dict): - size = sum([get_size(v, seen) for v in obj.values()]) + 8 - elif hasattr(obj, "__dict__"): - size += get_size(obj.__dict__, seen) + 8 - elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)): - size += sum([get_size(i, seen) for i in obj]) + 8 - return size - - class BlobWriter(object): # in som failure scenarios commit is called before __init__, so we need to define # this variable outside the __init__. buffer = bytearray() byte_count = 0 + manifest = {} def __init__( self, @@ -91,13 +52,11 @@ def __init__( else: self.append = self.text_append - - def arrow_append(self, record: dict = {}): self.records_in_buffer += 1 self.wal.append(record) # type:ignore # if this write would exceed the blob size, close it - if (self.wal.nbytes()) > self.maximum_blob_size and self.records_in_buffer > 0: + if self.wal.nbytes() > self.maximum_blob_size: self.commit() self.open_buffer() @@ -122,7 +81,7 @@ def text_append(self, record: dict = {}): # the newline isn't counted so add 1 to get the actual length if this write # would exceed the blob size, close it so another blob will be created - if len(self.buffer) > self.maximum_blob_size and self.records_in_buffer > 0: + if len(self.buffer) > self.maximum_blob_size: self.commit() self.open_buffer() @@ -176,6 +135,7 @@ def commit(self): if self.records_in_buffer > 0: lock = threading.Lock() + summary = None try: lock.acquire(blocking=True, timeout=10) @@ -189,6 +149,7 @@ def commit(self): ) pytable = self.wal.arrow() + summary = self.wal.profile.to_dicts() # if we have a schema, make effort to align the parquet file to it if self.schema: @@ -207,6 +168,7 @@ def commit(self): committed_blob_name = self.inner_writer.commit( byte_data=bytes(self.buffer), override_blob_name=None ) + self.manifest[committed_blob_name] = summary if "BACKOUT" in committed_blob_name: get_logger().warning( diff --git a/mabel/data/writers/internals/blob_writer_orso.py b/mabel/data/writers/internals/blob_writer_orso.py deleted file mode 100644 index 343bd95..0000000 --- a/mabel/data/writers/internals/blob_writer_orso.py +++ /dev/null @@ -1,167 +0,0 @@ -import datetime -import io -import sys -import threading - -import orso -import zstandard -from orso.logging import get_logger - -from mabel.data.validator import Schema -from mabel.errors import MissingDependencyError - -BLOB_SIZE = 64 * 1024 * 1024 # 64Mb, 16 files per gigabyte -SUPPORTED_FORMATS_ALGORITHMS = ("jsonl", "zstd", "parquet", "text", "flat") - - -def get_size(obj, seen=None): - """ - Recursively approximate the size of objects. - We don't know the actual size until we save, so we approximate the size based - on some rules - this will be wrong due to RLE, headers, precision and other - factors. - """ - obj_type = type(obj) - obj_id = id(obj) - - if seen is None: - seen = set() - - if obj_id in seen: - return 0 - - if obj_type == int or obj_type == float: - size = 6 - elif obj_type == bool: - size = 1 - elif obj_type in {str, bytes, bytearray}: - size = len(obj) + 4 - elif obj is None: # NoneType is not directly accessible - size = 1 - elif obj_type == datetime.datetime: - size = 8 - else: - size = sys.getsizeof(obj) - - # Mark as seen before entering recursion to gracefully handle - # self-referential objects - seen.add(obj_id) - - if obj_type == dict: - size += sum(get_size(v, seen) for v in obj.values()) + 8 - elif hasattr(obj, "__dict__"): - size += get_size(obj.__dict__, seen) + 8 - elif hasattr(obj, "__iter__") and obj_type not in {str, bytes, bytearray}: - size += sum(get_size(i, seen) for i in obj) + 8 - - return size - - -class BlobWriter(object): - wal = [] - - def __init__( - self, - *, # force params to be named - inner_writer=None, # type:ignore - blob_size: int = BLOB_SIZE, - format: str = "zstd", - schema: Schema = None, - **kwargs, - ): - self.format = format - self.maximum_blob_size = blob_size - - if format not in SUPPORTED_FORMATS_ALGORITHMS: - raise ValueError( - f"Invalid format `{format}`, valid options are {SUPPORTED_FORMATS_ALGORITHMS}" - ) - - kwargs["format"] = format - self.inner_writer = inner_writer(**kwargs) # type:ignore - - self.schema = None - if isinstance(schema, (list, dict)): - schema = Schema(schema) - if isinstance(schema, Schema): - self.schema = schema - if self.schema is None: - self.schema = [] - raise MissingDependencyError("Writers require a Schema") - self.schema = self.schema.schema - - self.wal = orso.DataFrame(rows=[], schema=self.schema) - self.records_in_buffer = 0 - self.byte_count = 5120 - - def append(self, record: dict): - self.wal.append(record) - self.records_in_buffer += 1 - self.byte_count += get_size(record) - # if this write would exceed the blob size, close it - if self.byte_count >= self.maximum_blob_size: - self.commit() - self.wal = orso.DataFrame(rows=[], schema=self.schema) - self.records_in_buffer = 0 - self.byte_count = 5120 - - return self.records_in_buffer - - def commit(self): - committed_blob_name = "" - - if len(self.wal) > 0: - lock = threading.Lock() - - try: - lock.acquire(blocking=True, timeout=10) - - if self.format == "parquet": - try: - import pyarrow.parquet - except ImportError: - raise MissingDependencyError( - "`pyarrow` missing, please install or include in `requirements.txt`." - ) - pytable = self.wal.arrow() - - tempfile = io.BytesIO() - pyarrow.parquet.write_table(pytable, where=tempfile, compression="zstd") - - tempfile.seek(0) - buffer = tempfile.read() - - else: - buffer = b"\n".join(r.as_json for r in self.wal) + b"\n" - if self.format == "zstd": - # zstandard is an non-optional installed dependency - buffer = zstandard.compress(buffer) - - committed_blob_name = self.inner_writer.commit( - byte_data=bytes(buffer), override_blob_name=None - ) - - if "BACKOUT" in committed_blob_name: - get_logger().warning( - f"{self.records_in_buffer:n} failed records written to BACKOUT partition `{committed_blob_name}`" - ) - get_logger().debug( - { - "format": self.format, - "committed_blob": committed_blob_name, - "records": ( - len(buffer) if self.format == "parquet" else self.records_in_buffer - ), - } - ) - finally: - lock.release() - - self.wal = orso.DataFrame(rows=[], schema=self.schema) - self.records_in_buffer = 0 - self.byte_count = 5120 - return committed_blob_name - - def __del__(self): - # this should never be relied on to save data - self.commit() diff --git a/tests/storage_tests/disk/test_writer_disk_writer.py b/tests/storage_tests/disk/test_writer_disk_writer.py index 5e31ea0..bd137c0 100644 --- a/tests/storage_tests/disk/test_writer_disk_writer.py +++ b/tests/storage_tests/disk/test_writer_disk_writer.py @@ -14,7 +14,6 @@ from mabel.data.internals.dictset import STORAGE_CLASS - def do_writer(): w = BatchWriter( inner_writer=DiskWriter, diff --git a/tests/test_writer_batch_writer.py b/tests/test_writer_batch_writer.py index afdafd5..ce470d7 100644 --- a/tests/test_writer_batch_writer.py +++ b/tests/test_writer_batch_writer.py @@ -217,4 +217,5 @@ def get_data(): if __name__ == "__main__": # pragma: no cover from tests.helpers.runner import run_tests + test_writer_without_schema_parquet() run_tests()