Skip to content

Commit

Permalink
0.6.20
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Mar 16, 2024
1 parent 171219c commit 53ddd26
Show file tree
Hide file tree
Showing 45 changed files with 64 additions and 45 deletions.
1 change: 1 addition & 0 deletions mabel/adapters/google/bigquery_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
BigQuery Writer
"""

from enum import Enum
from typing import Optional

Expand Down
1 change: 1 addition & 0 deletions mabel/adapters/google/google_cloud_storage_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Google Cloud Storage Reader
"""

import os

from mabel.data.readers.internals.base_inner_reader import BaseInnerReader
Expand Down
1 change: 1 addition & 0 deletions mabel/adapters/null/null_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#nodoc - don't add to the documentation wiki
"""

from ...data.writers.internals.base_inner_writer import BaseInnerWriter


Expand Down
1 change: 1 addition & 0 deletions mabel/data/internals/dnf_filters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This is a filtering mechanism to be applied when reading data.
"""

import operator
from typing import Iterable
from typing import List
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
If you're doing few operations on the the data and it is easily recreated or local
storage is fast, this isn't a good option.
"""

import gc
from typing import List
from typing import Set
Expand Down
1 change: 1 addition & 0 deletions mabel/data/internals/storage_classes/storage_class_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
The Reader and Writer are pretty fast, the bottleneck is the parsing and serialization
of JSON data - this accounts for over 50% of the read/write times.
"""

import atexit
import mmap
import os
Expand Down
1 change: 1 addition & 0 deletions mabel/data/readers/internals/base_inner_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Base Inner Reader
"""

import abc
import datetime
import io
Expand Down
1 change: 1 addition & 0 deletions mabel/data/readers/internals/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- location : the record in the active partition (blob), so we can resume reading
midway through the blob if required.
"""

import orjson
from orso.cityhash import CityHash64

Expand Down
1 change: 1 addition & 0 deletions mabel/data/readers/internals/inline_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
These are the function definitions, the processor which uses these is in the
'inline_evaluator' module.
"""

import datetime
from functools import lru_cache
from math import trunc
Expand Down
1 change: 1 addition & 0 deletions mabel/data/readers/internals/multiprocess_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
When a reliable use case for multiprocessing is identified it may be included into the
automatic running of the data accesses.
"""

import datetime
import logging
import multiprocessing
Expand Down
1 change: 1 addition & 0 deletions mabel/data/readers/internals/threaded_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
"""

import logging
import threading
import time
Expand Down
2 changes: 1 addition & 1 deletion mabel/data/validator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def schema_loader(
) -> Union[RelationSchema, bool]:
if definition is None:
raise ValueError(
"Writer is missing a schema, minimum schema is a list of the columns or explicitly set to 'False' is data is unschamable."
"Writer is missing a schema, minimum schema is a list of the columns or explicitly set to 'False' is data is unschemable."
)

if definition is False:
Expand Down
1 change: 1 addition & 0 deletions mabel/data/writers/batch_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def finalize(self, **kwargs):

completion_path = self.blob_writer.inner_writer.filename
completion_path = os.path.split(completion_path)[0] + "/frame.complete"
manifest_path = os.path.split(completion_path)[0] + "/manifest.json"
self.metadata["records"] = self.records
if self.schema:
if isinstance(self.schema, dict):
Expand Down
1 change: 1 addition & 0 deletions mabel/data/writers/internals/base_inner_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
The primary activity is contained in the .commit() method.
"""

import abc
import os
import time
Expand Down
50 changes: 18 additions & 32 deletions mabel/data/writers/internals/blob_writer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import io
import json
import sys
import threading
from typing import Optional

import orjson
import orso
import zstandard
from orso.logging import get_logger
from orso.schema import RelationSchema
Expand All @@ -13,7 +15,8 @@
from mabel.data.validator import schema_loader
from mabel.errors import MissingDependencyError

BLOB_SIZE = 64 * 1024 * 1024 # 64Mb, 16 files per gigabyte
# we use 62Mb to allow for headers/footers and errors in calcs
BLOB_SIZE = 62 * 1024 * 1024 # 64Mb, 16 files per gigabyte
SUPPORTED_FORMATS_ALGORITHMS = ("jsonl", "zstd", "parquet", "text", "flat")


Expand Down Expand Up @@ -80,29 +83,24 @@ def __init__(

kwargs["format"] = format
self.inner_writer = inner_writer(**kwargs) # type:ignore

self.schema = schema_loader(schema)
self.open_buffer()

if self.format == "parquet":
self.append = self.arrow_append
else:
self.append = self.text_append

self.schema = schema_loader(schema)


def arrow_append(self, record: dict = {}):
record_length = get_size(record)
self.records_in_buffer += 1
self.wal.append(record) # type:ignore
# if this write would exceed the blob size, close it
if (
self.byte_count + record_length
) > self.maximum_blob_size and self.records_in_buffer > 0:
if (self.wal.nbytes()) > self.maximum_blob_size and self.records_in_buffer > 0:
self.commit()
self.open_buffer()

self.byte_count += record_length + 16
self.records_in_buffer += 1
self.buffer.append(record) # type:ignore

def text_append(self, record: dict = {}):
# serialize the record
if self.format == "text":
Expand Down Expand Up @@ -155,7 +153,7 @@ def _normalize_arrow_schema(self, table, mabel_schema: RelationSchema):
"BOOLEAN": pyarrow.bool_(),
"INTEGER": pyarrow.int64(),
"DOUBLE": pyarrow.float64(),
"ARRAY": pyarrow.list_(pyarrow.string())
"ARRAY": pyarrow.list_(pyarrow.string()),
# "STRUCT": pyarrow.map_(pyarrow.string(), pyarrow.string())
}

Expand All @@ -175,7 +173,7 @@ def _normalize_arrow_schema(self, table, mabel_schema: RelationSchema):
def commit(self):
committed_blob_name = ""

if len(self.buffer) > 0:
if self.records_in_buffer > 0:
lock = threading.Lock()

try:
Expand All @@ -190,24 +188,13 @@ def commit(self):
"`pyarrow` is missing, please install or include in requirements.txt"
)

import io

tempfile = io.BytesIO()

# Add in any columns from the schema
columns = sorted(self.schema.column_names)

# then we make sure each row has all the columns
self.buffer = [
{column: row.get(column) for column in columns} for row in self.buffer
]

pytable = pyarrow.Table.from_pylist(self.buffer)
pytable = self.wal.arrow()

# if we have a schema, make effort to align the parquet file to it
if self.schema:
pytable = self._normalize_arrow_schema(pytable, self.schema)

tempfile = io.BytesIO()
pyarrow.parquet.write_table(pytable, where=tempfile, compression="zstd")

tempfile.seek(0)
Expand All @@ -229,10 +216,10 @@ def commit(self):
{
"format": self.format,
"committed_blob": committed_blob_name,
"records": len(self.buffer)
if self.format == "parquet"
else self.records_in_buffer,
"bytes": self.byte_count if self.format == "parquet" else len(self.buffer),
"records": (
len(self.buffer) if self.format == "parquet" else self.records_in_buffer
),
"bytes": len(self.buffer),
}
)
finally:
Expand All @@ -243,8 +230,7 @@ def commit(self):

def open_buffer(self):
if self.format == "parquet":
self.buffer = []
self.byte_count = 5120 # parquet has headers etc
self.wal = orso.DataFrame(rows=[], schema=self.schema)
else:
self.buffer = bytearray()
self.byte_count = 0
Expand Down
6 changes: 3 additions & 3 deletions mabel/data/writers/internals/blob_writer_orso.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ def commit(self):
{
"format": self.format,
"committed_blob": committed_blob_name,
"records": len(buffer)
if self.format == "parquet"
else self.records_in_buffer,
"records": (
len(buffer) if self.format == "parquet" else self.records_in_buffer
),
}
)
finally:
Expand Down
1 change: 1 addition & 0 deletions mabel/data/writers/internals/writer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
can be more active BlobWriters, this is used to determine how many
BlobWriters to recommend for evict.
"""

import threading
import time

Expand Down
6 changes: 3 additions & 3 deletions mabel/data/writers/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def __init__(

arg_dict = kwargs.copy()
arg_dict["dataset"] = f"{self.dataset}"
arg_dict[
"inner_writer"
] = f"{arg_dict.get('inner_writer', type(None)).__name__}" # type:ignore
arg_dict["inner_writer"] = (
f"{arg_dict.get('inner_writer', type(None)).__name__}"
) # type:ignore
logger.debug(orjson.dumps(arg_dict))

# add the schema to the writer - pyarrow uses this
Expand Down
1 change: 1 addition & 0 deletions mabel/errors/render_error_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#nodoc - don't add to the documentation wiki
"""

import os.path
import pathlib
import sys
Expand Down
1 change: 1 addition & 0 deletions mabel/utils/entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import base64
import os
from math import ceil
Expand Down
1 change: 1 addition & 0 deletions mabel/utils/paths.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Functions to help with handling file paths
"""

import datetime
import errno
import os
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
line-length = 100
target-version = ['py310']
fast = true
jobs = 4
cache = true

[tool.isort]
profile = "black"
Expand Down
1 change: 1 addition & 0 deletions tests/NFT/currency.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

"""
A simple script to determine the bill of materials in terms of installed
pypi packages and then compare the installed version to the latest
Expand Down
1 change: 1 addition & 0 deletions tests/NFT/maintainability.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Radon itself will A grade for maintainability for scores 100 to 20, this
script sets the bar at 50.
"""

import radon.metrics # type:ignore
import logging
import glob
Expand Down
1 change: 1 addition & 0 deletions tests/performance/avro_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
outcome: stay with the simple JSONL format.
"""

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
Expand Down
1 change: 1 addition & 0 deletions tests/performance/compression_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Results indicate that if you're IO bound (you almost definitely are) that the
effort to compress is paid back many times by the effort to IO.
"""

import os
import time
import lzma
Expand Down
1 change: 1 addition & 0 deletions tests/performance/index_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
no | no | 0.332
-------------------------------
"""

import os, sys

sys.path.insert(1, os.path.join(sys.path[0], "../.."))
Expand Down
1 change: 1 addition & 0 deletions tests/performance/indexing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
"""

import sys
import os

Expand Down
1 change: 1 addition & 0 deletions tests/performance/json_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-------------------------------
"""

import time


Expand Down
1 change: 1 addition & 0 deletions tests/performance/validator_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
│ pydantic │ 2.1748749800000002 │
└──────────┴────────────────────┘
"""

import time
import pydantic
import datetime
Expand Down
1 change: 1 addition & 0 deletions tests/performance/writer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
└─────────────┴────────────┴────────┴───────┴─────────────┘
Your numbers will vary depending on any number of factors
"""

import sys
import os
import time
Expand Down
1 change: 1 addition & 0 deletions tests/storage_tests/disk/test_reader_disk_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Test the file reader
"""

import datetime
import pytest
import os
Expand Down
Loading

0 comments on commit 53ddd26

Please sign in to comment.