Skip to content

Commit

Permalink
0.6.20
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Mar 17, 2024
1 parent 53ddd26 commit be69b39
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 215 deletions.
7 changes: 4 additions & 3 deletions mabel/data/writers/batch_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ def finalize(self, **kwargs):

completion_path = self.blob_writer.inner_writer.filename
completion_path = os.path.split(completion_path)[0] + "/frame.complete"
manifest_path = os.path.split(completion_path)[0] + "/manifest.json"
self.metadata["records"] = self.records
self.metadata["format"] = self.blob_writer.format
if self.schema:
if isinstance(self.schema, dict):
self.metadata["schema"] = self.schema
elif hasattr(self.schema, "definition"):
self.metadata["schema"] = self.schema.definition
elif hasattr(self.schema, "to_dict"):
self.metadata["schema"] = self.schema.to_dict()
self.metadata["manifest"] = self.blob_writer.manifest
flag = self.blob_writer.inner_writer.commit(
byte_data=orjson.dumps(self.metadata, option=orjson.OPT_INDENT_2),
override_blob_name=completion_path,
Expand Down
50 changes: 6 additions & 44 deletions mabel/data/writers/internals/blob_writer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import datetime
import io
import json
import sys
import threading
from typing import Optional

Expand All @@ -20,49 +18,12 @@
SUPPORTED_FORMATS_ALGORITHMS = ("jsonl", "zstd", "parquet", "text", "flat")


def get_size(obj, seen=None):
"""
Recursively approximate the size of objects.
We don't know the actual size until we save, so we approximate the size based
on some rules - this will be wrong due to RLE, headers, precision and other
factors.
"""
size = sys.getsizeof(obj)

if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0

if isinstance(obj, (int, float)):
size = 6 # probably 4 bytes, could be 8
if isinstance(obj, bool):
size = 1
if isinstance(obj, (str, bytes, bytearray)):
size = len(obj) + 4
if obj is None:
size = 1
if isinstance(obj, datetime.datetime):
size = 8

# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size = sum([get_size(v, seen) for v in obj.values()]) + 8
elif hasattr(obj, "__dict__"):
size += get_size(obj.__dict__, seen) + 8
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj]) + 8
return size


class BlobWriter(object):
# in som failure scenarios commit is called before __init__, so we need to define
# this variable outside the __init__.
buffer = bytearray()
byte_count = 0
manifest = {}

def __init__(
self,
Expand Down Expand Up @@ -91,13 +52,11 @@ def __init__(
else:
self.append = self.text_append



def arrow_append(self, record: dict = {}):
self.records_in_buffer += 1
self.wal.append(record) # type:ignore
# if this write would exceed the blob size, close it
if (self.wal.nbytes()) > self.maximum_blob_size and self.records_in_buffer > 0:
if self.wal.nbytes() > self.maximum_blob_size:
self.commit()
self.open_buffer()

Expand All @@ -122,7 +81,7 @@ def text_append(self, record: dict = {}):

# the newline isn't counted so add 1 to get the actual length if this write
# would exceed the blob size, close it so another blob will be created
if len(self.buffer) > self.maximum_blob_size and self.records_in_buffer > 0:
if len(self.buffer) > self.maximum_blob_size:
self.commit()
self.open_buffer()

Expand Down Expand Up @@ -176,6 +135,7 @@ def commit(self):
if self.records_in_buffer > 0:
lock = threading.Lock()

summary = None
try:
lock.acquire(blocking=True, timeout=10)

Expand All @@ -189,6 +149,7 @@ def commit(self):
)

pytable = self.wal.arrow()
summary = self.wal.profile.to_dicts()

# if we have a schema, make effort to align the parquet file to it
if self.schema:
Expand All @@ -207,6 +168,7 @@ def commit(self):
committed_blob_name = self.inner_writer.commit(
byte_data=bytes(self.buffer), override_blob_name=None
)
self.manifest[committed_blob_name] = summary

if "BACKOUT" in committed_blob_name:
get_logger().warning(
Expand Down
167 changes: 0 additions & 167 deletions mabel/data/writers/internals/blob_writer_orso.py

This file was deleted.

1 change: 0 additions & 1 deletion tests/storage_tests/disk/test_writer_disk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from mabel.data.internals.dictset import STORAGE_CLASS



def do_writer():
w = BatchWriter(
inner_writer=DiskWriter,
Expand Down
1 change: 1 addition & 0 deletions tests/test_writer_batch_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,5 @@ def get_data():
if __name__ == "__main__": # pragma: no cover
from tests.helpers.runner import run_tests

test_writer_without_schema_parquet()
run_tests()

0 comments on commit be69b39

Please sign in to comment.