Skip to content

Commit

Permalink
MongoDB Full: Polars' read_ndjson doesn't load MongoDB JSON data well
Browse files Browse the repository at this point in the history
Use `fsspec` and `orjson` instead.
  • Loading branch information
amotl committed Sep 22, 2024
1 parent 6b548b8 commit 7155c75
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Table Loader: Improved conditional handling of "transformation" parameter
- Table Loader: Improved status reporting and error logging in `BulkProcessor`
- MongoDB: Improve error reporting
- MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well,
use `fsspec` and `orjson` instead

## 2024/09/19 v0.0.24
- MongoDB Full: Refactor transformation subsystem to `commons-codec`
Expand Down
25 changes: 13 additions & 12 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path

import boltons.urlutils
import polars as pl
import bson
import pymongo
import yarl
from attrs import define, field
Expand All @@ -18,6 +18,7 @@

from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.io import read_json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,13 +107,13 @@ def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using filter expressions is not supported by filesystem adapter")
if self.limit:
raise NotImplementedError("Using limit parameter is not supported by filesystem adapter")
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using offset parameter is not supported by filesystem adapter")
if self._path.suffix in [".json", ".jsonl", ".ndjson"]:
data = pl.read_ndjson(
self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
data = read_json(str(self._path))
elif ".bson" in str(self._path):
data = IterableData(str(self._path), options={"format_in": "bson"}).iter()
else:
Expand All @@ -137,15 +138,15 @@ def record_count(self, filter_=None) -> int:

def query(self):
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using filter expressions is not supported by remote resource adapter")
if self.limit:
raise NotImplementedError("Using limit parameter is not supported by remote resource adapter")
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using offset parameter is not supported by remote resource adapter")
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
data = pl.read_ndjson(
str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
data = read_json(str(self._url))
elif self._url.path.endswith(".bson"):
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC")
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet.")
else:
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/io/mongodb/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import typing as t

from commons_codec.transform.mongodb import Document
from pymongo.cursor import Cursor

from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents
Expand Down Expand Up @@ -44,7 +45,9 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict:
return d


def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]:
def batches(
data: t.Union[Cursor, Documents, t.Generator[Document, None, None]], batch_size: int = 100
) -> t.Generator[Documents, None, None]:
"""
Generate batches of documents.
"""
Expand Down
15 changes: 15 additions & 0 deletions cratedb_toolkit/util/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import orjsonl
from fsspec import filesystem
from upath import UPath


def read_json(url: str):
"""
Read JSON file from anywhere.
TODO: Does the resource need to be closed? How?
"""
p = UPath(url)
fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs
fp = fs.open(p.path)
data = orjsonl.stream(fp)
return data
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ dependencies = [
"cratedb-sqlparse==0.0.6",
'importlib-metadata; python_version < "3.8"',
'importlib-resources; python_version < "3.9"',
"orjsonl<2",
"polars<1.7",
"pympler<1.2",
"python-dateutil<3",
Expand Down Expand Up @@ -154,8 +155,10 @@ influxdb = [
io = [
"cr8",
"dask[dataframe]>=2020",
"fsspec[s3,http]",
"pandas<3,>=1",
"sqlalchemy>=2",
"universal-pathlib<0.3",
]
kinesis = [
"aiobotocore<2.16",
Expand Down

0 comments on commit 7155c75

Please sign in to comment.