diff --git a/CHANGES.md b/CHANGES.md index 8265cbf..134cac1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,8 @@ - Table Loader: Improved conditional handling of "transformation" parameter - Table Loader: Improved status reporting and error logging in `BulkProcessor` - MongoDB: Improve error reporting +- MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well, + use `fsspec` and `orjson` instead ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index 9163321..8a4629d 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -8,7 +8,7 @@ from pathlib import Path import boltons.urlutils -import polars as pl +import bson import pymongo import yarl from attrs import define, field @@ -18,6 +18,7 @@ from cratedb_toolkit.io.mongodb.util import batches from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util.io import read_json logger = logging.getLogger(__name__) @@ -106,13 +107,13 @@ def query(self): if not self._path.exists(): raise FileNotFoundError(f"Resource not found: {self._path}") if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by filesystem adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by filesystem adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by filesystem adapter") if self._path.suffix in [".json", ".jsonl", ".ndjson"]: - data = pl.read_ndjson( - self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._path)) elif ".bson" in str(self._path): data = IterableData(str(self._path), options={"format_in": "bson"}).iter() else: @@ -137,15 +138,15 @@ def record_count(self, filter_=None) -> int: def query(self): if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by remote resource adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by remote resource adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by remote resource adapter") if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): - data = pl.read_ndjson( - str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._url)) elif self._url.path.endswith(".bson"): - raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC") + raise NotImplementedError("HTTP+BSON loader does not support .bson files yet.") else: raise ValueError(f"Unsupported file type: {self._url}") return batches(data, self.batch_size) diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 0aed69a..0df03b5 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,7 @@ import re import typing as t +from commons_codec.transform.mongodb import Document from pymongo.cursor import Cursor from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents @@ -44,7 +45,9 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict: return d -def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]: +def batches( + data: t.Union[Cursor, Documents, t.Generator[Document, None, None]], batch_size: int = 100 +) -> t.Generator[Documents, None, None]: """ Generate batches of documents. """ diff --git a/cratedb_toolkit/util/io.py b/cratedb_toolkit/util/io.py new file mode 100644 index 0000000..f52fc43 --- /dev/null +++ b/cratedb_toolkit/util/io.py @@ -0,0 +1,15 @@ +import orjsonl +from fsspec import filesystem +from upath import UPath + + +def read_json(url: str): + """ + Read JSON file from anywhere. + TODO: Does the resource need to be closed? How? + """ + p = UPath(url) + fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs + fp = fs.open(p.path) + data = orjsonl.stream(fp) + return data diff --git a/pyproject.toml b/pyproject.toml index af3c1ff..1c90caa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,7 @@ dependencies = [ "cratedb-sqlparse==0.0.6", 'importlib-metadata; python_version < "3.8"', 'importlib-resources; python_version < "3.9"', + "orjsonl<2", "polars<1.7", "pympler<1.2", "python-dateutil<3", @@ -154,8 +155,10 @@ influxdb = [ io = [ "cr8", "dask[dataframe]>=2020", + "fsspec[s3,http]", "pandas<3,>=1", "sqlalchemy>=2", + "universal-pathlib<0.3", ] kinesis = [ "aiobotocore<2.16",