diff --git a/CHANGES.md b/CHANGES.md index acbc160..762444c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - Zyp/Moksha/jq: `to_object` function now respects a `zap` option, that removes the element altogether if it's empty - Zyp/Moksha/jq: Improve error reporting at `MokshaTransformation.apply` +- MongoDB: Improved `MongoDBCrateDBConverter.decode_extended_json` ## 2024/09/22 v0.0.17 - MongoDB: Fixed edge case when decoding MongoDB Extended JSON elements diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 42323fd..c94efe9 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -6,6 +6,7 @@ import datetime as dt import logging import typing as t +from functools import cached_property from typing import Iterable import bson @@ -24,18 +25,6 @@ logger = logging.getLogger(__name__) -def date_converter(value): - if isinstance(value, int): - return value - elif isinstance(value, (str, bytes)): - datetime = dateparser.parse(value) - elif isinstance(value, dt.datetime): - datetime = value - else: - raise ValueError(f"Unable to convert datetime value: {value}") - return calendar.timegm(datetime.utctimetuple()) * 1000 - - @define class MongoDBCrateDBConverter: """ @@ -44,6 +33,9 @@ class MongoDBCrateDBConverter: Extracted from cratedb-toolkit, earlier migr8. """ + timestamp_to_epoch: bool = False + timestamp_to_iso8601: bool = False + timestamp_use_milliseconds: bool = False transformation: t.Any = None def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]: @@ -73,7 +65,7 @@ def decode_value(self, value: t.Any) -> t.Any: if isinstance(value, dict): # Decode item in BSON CANONICAL format. if len(value) == 1 and next(iter(value)).startswith("$"): - return self.decode_canonical(value) + return self.decode_extended_json(value) # Custom adjustments to compensate shape anomalies in source data. # TODO: Review if it can be removed or refactored. @@ -111,33 +103,82 @@ def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]: """ return _json_convert(item) - @staticmethod - def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any: + def decode_extended_json(self, value: t.Dict[str, t.Any]) -> t.Any: """ - Decode MongoDB Extended JSON CANONICAL representation. + Decode MongoDB Extended JSON representation, canonical and legacy variants. """ - type_ = list(value.keys())[0] + + out: t.Any + # Special handling for datetime representation in NUMBERLONG format (emulated depth-first). + type_ = next(iter(value)) # Get key of first item in dictionary. is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"] if is_date_numberlong: - return int(object_hook(value["$date"])) + out = dt.datetime.fromtimestamp(int(value["$date"]["$numberLong"]) / 1000, tz=dt.UTC) else: - value = object_hook(value) - is_bson = type(value).__module__.startswith("bson") - - out: t.Any = value - if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE: - out = value.as_uuid() - elif isinstance(value, bson.Binary): - out = base64.b64encode(value).decode() - elif isinstance(value, bson.Timestamp): - out = value.as_datetime() + out = object_hook(value) + + is_bson = isinstance(out, self.all_bson_types) + + # Decode BSON types. + if isinstance(out, bson.Binary) and out.subtype == bson.UUID_SUBTYPE: + out = out.as_uuid() + elif isinstance(out, bson.Binary): + out = base64.b64encode(out).decode() + elif isinstance(out, bson.Timestamp): + out = out.as_datetime() + + # Decode Python types. if isinstance(out, dt.datetime): - return date_converter(out) + if self.timestamp_to_epoch: + out = self.convert_epoch(out) + if self.timestamp_use_milliseconds: + out *= 1000 + return out + elif self.timestamp_to_iso8601: + return self.convert_iso8601(out) + + # Wrap up decoded BSON types as strings. if is_bson: return str(out) + + # Return others converted as-is. return out + @cached_property + def all_bson_types(self) -> t.Tuple[t.Type, ...]: + _types: t.List[t.Type] = [] + for _typ in bson._ENCODERS: + if hasattr(_typ, "_type_marker"): + _types.append(_typ) + return tuple(_types) + + @staticmethod + def convert_epoch(value: t.Any) -> float: + if isinstance(value, int): + return value + elif isinstance(value, dt.datetime): + datetime = value + elif isinstance(value, (str, bytes)): + datetime = dateparser.parse(value) + else: + raise ValueError(f"Unable to convert datetime value: {value}") + return calendar.timegm(datetime.utctimetuple()) + + @staticmethod + def convert_iso8601(value: t.Any) -> str: + if isinstance(value, str): + return value + elif isinstance(value, dt.datetime): + datetime = value + elif isinstance(value, bytes): + return value.decode("utf-8") + elif isinstance(value, int): + datetime = dt.datetime.fromtimestamp(value, tz=dt.UTC) + else: + raise ValueError(f"Unable to convert datetime value: {value}") + return datetime.isoformat() + def apply_special_treatments(self, value: t.Any): """ Apply special treatments to value that can't be described otherwise up until now. @@ -176,7 +217,7 @@ class MongoDBTranslatorBase: def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None): self.table_name = quote_relation_name(table_name) - self.converter = converter or MongoDBCrateDBConverter() + self.converter = converter or MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True) @property def sql_ddl(self): diff --git a/tests/transform/mongodb/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py index fa3e58b..2c08a7f 100644 --- a/tests/transform/mongodb/test_mongodb_convert.py +++ b/tests/transform/mongodb/test_mongodb_convert.py @@ -1,35 +1,67 @@ # ruff: noqa: E402 +import datetime as dt +import typing as t + import pytest +from attrs import define pytestmark = pytest.mark.mongodb -from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter +from commons_codec.transform.mongodb import MongoDBCrateDBConverter from zyp.model.bucket import BucketTransformation, ValueConverter from zyp.model.collection import CollectionTransformation from zyp.model.treatment import Treatment +convert_epoch = MongoDBCrateDBConverter.convert_epoch +convert_iso8601 = MongoDBCrateDBConverter.convert_iso8601 + -def test_date_converter_int(): +def test_epoch_ms_converter_int(): """ Datetime values encoded as integer values will be returned unmodified. """ - assert date_converter(1443004362000) == 1443004362000 + assert convert_epoch(1443004362) == 1443004362 + assert convert_epoch(1443004362987) == 1443004362987 -def test_date_converter_iso8601(): +def test_epoch_ms_converter_iso8601(): """ Datetime values encoded as ISO8601 values will be parsed. """ - assert date_converter("2015-09-23T10:32:42.33Z") == 1443004362000 - assert date_converter(b"2015-09-23T10:32:42.33Z") == 1443004362000 + assert convert_epoch("2015-09-23T10:32:42.33Z") == 1443004362 + assert convert_epoch(b"2015-09-23T10:32:42.33Z") == 1443004362 -def test_date_converter_invalid(): +def test_epoch_ms_converter_invalid(): """ Incorrect datetime values will not be parsed. """ with pytest.raises(ValueError) as ex: - date_converter(None) + convert_epoch(None) + assert ex.match("Unable to convert datetime value: None") + + +def test_iso8601_converter_int(): + """ + Datetime values encoded as integer values will be returned unmodified. + """ + assert convert_iso8601(1443004362) == "2015-09-23T10:32:42+00:00" + + +def test_iso8601_converter_iso8601(): + """ + Datetime values encoded as ISO8601 values will be parsed. + """ + assert convert_iso8601("2015-09-23T10:32:42.33Z") == "2015-09-23T10:32:42.33Z" + assert convert_iso8601(b"2015-09-23T10:32:42.33Z") == "2015-09-23T10:32:42.33Z" + + +def test_iso8601_converter_invalid(): + """ + Incorrect datetime values will not be parsed. + """ + with pytest.raises(ValueError) as ex: + convert_iso8601(None) assert ex.match("Unable to convert datetime value: None") @@ -56,6 +88,65 @@ def test_convert_basic(): assert list(converter.decode_documents([data_in])) == [data_out] +@define +class DateConversionCase: + converter: t.Callable + data_in: t.Any + data_out: t.Any + + +testdata = [ + DateConversionCase( + converter=MongoDBCrateDBConverter(), + data_in={"$date": "2015-09-23T10:32:42.123456Z"}, + data_out=dt.datetime(2015, 9, 23, 10, 32, 42, 123456), + ), + DateConversionCase( + converter=MongoDBCrateDBConverter(), + data_in={"$date": {"$numberLong": "1655210544987"}}, + data_out=dt.datetime(2022, 6, 14, 12, 42, 24, 987000, tzinfo=dt.UTC), + ), + DateConversionCase( + converter=MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True), + data_in={"$date": "2015-09-23T10:32:42.123456Z"}, + data_out=1443004362000, + ), + DateConversionCase( + converter=MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True), + data_in={"$date": {"$numberLong": "1655210544987"}}, + data_out=1655210544000, + ), + DateConversionCase( + converter=MongoDBCrateDBConverter(timestamp_to_iso8601=True), + data_in={"$date": "2015-09-23T10:32:42.123456Z"}, + data_out="2015-09-23T10:32:42.123456", + ), + DateConversionCase( + converter=MongoDBCrateDBConverter(timestamp_to_iso8601=True), + data_in={"$date": {"$numberLong": "1655210544987"}}, + data_out="2022-06-14T12:42:24.987000+00:00", + ), +] + + +testdata_ids = [ + "vanilla-$date-canonical", + "vanilla-$date-legacy", + "epochms-$date-canonical", + "epochms-$date-legacy", + "iso8601-$date-canonical", + "iso8601-$date-legacy", +] + + +@pytest.mark.parametrize("testcase", testdata, ids=testdata_ids) +def test_convert_timestamp_many(testcase: DateConversionCase): + """ + Verify converting timestamps using different modifiers. + """ + assert testcase.converter.decode_document(testcase.data_in) == testcase.data_out + + def test_convert_with_treatment_ignore_complex_lists(): """ The `ignore_complex_lists` treatment ignores lists of dictionaries, often having deviating substructures. @@ -83,7 +174,11 @@ def test_convert_with_treatment_ignore_complex_lists(): treatment = Treatment(ignore_complex_lists=True) transformation = CollectionTransformation(treatment=treatment) - converter = MongoDBCrateDBConverter(transformation=transformation) + converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) assert converter.decode_document(data_in) == data_out @@ -119,7 +214,11 @@ def test_convert_with_treatment_normalize_complex_lists(): treatment = Treatment(normalize_complex_lists=True) transformation = CollectionTransformation(treatment=treatment) - converter = MongoDBCrateDBConverter(transformation=transformation) + converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) assert converter.decode_document(data_in) == data_out @@ -165,7 +264,11 @@ def test_convert_with_treatment_all_options(): ], ) transformation = CollectionTransformation(treatment=treatment) - converter = MongoDBCrateDBConverter(transformation=transformation) + converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) assert converter.decode_document(data_in) == data_out @@ -180,6 +283,10 @@ def test_convert_transform_timestamp(): values=ValueConverter().add(pointer="/timestamp", transformer="to_unixtime"), ) transformation = CollectionTransformation(bucket=bucket_transformation) - converter = MongoDBCrateDBConverter(transformation=transformation) + converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) data = converter.decode_documents(data_in) assert data == data_out diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py index 9c7bd6a..d0d447f 100644 --- a/tests/transform/mongodb/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -43,7 +43,11 @@ def make_translator(kind: str) -> MongoDBFullLoadTranslator: .jq(".[] |= (.python.to_list |= to_array)") .jq(".[] |= (.python.to_string |= tostring)") ) - converter = MongoDBCrateDBConverter(transformation=transformation) + converter = MongoDBCrateDBConverter( + timestamp_to_epoch=True, + timestamp_use_milliseconds=True, + transformation=transformation, + ) translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=converter) return translator