Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Add Zyp transformations to CDC subsystem #288

Merged
merged 2 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
MongoDB: Add Zyp transformations to the CDC subsystem
... making it more symmetric to the full-load procedure.
  • Loading branch information
amotl committed Oct 9, 2024
commit cfc161f86ce1b08a769f47f429647f291a6af0bb
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- MongoDB: Added Zyp transformations to the CDC subsystem,
making it more symmetric to the full-load procedure.

## 2024/10/09 v0.0.28
- IO: Improved `BulkProcessor` when running per-record operations by
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ def load_table(

from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
return mongodb_relay_cdc(
source_url_obj,
target_url,
transformation=transformation,
)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

Expand Down
13 changes: 13 additions & 0 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def record_count(self, filter_=None) -> int:
def query(self):
raise NotImplementedError()

@abstractmethod
def subscribe(self):
raise NotImplementedError()


@define
class MongoDBFilesystemAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -122,6 +126,9 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._path.suffix}")
return batches(data, self.batch_size)

def subscribe(self):
raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter")


@define
class MongoDBResourceAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -153,6 +160,9 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)

def subscribe(self):
raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream")


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
Expand Down Expand Up @@ -193,6 +203,9 @@ def query(self):
)
return batches(data, self.batch_size)

def subscribe(self):
return self._mongodb_collection.watch(full_document="updateLookup")


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
Expand Down
24 changes: 10 additions & 14 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ def mongodb_copy(
return outcome


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
def mongodb_relay_cdc(
source_url,
target_url,
transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None,
):
"""
Synopsis
--------
Expand All @@ -191,22 +195,14 @@ def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()
# Optionally configure transformations.
tm = TransformationManager.from_any(transformation)

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
)

# Invoke machinery.
Expand Down
65 changes: 50 additions & 15 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
"""

import logging
import typing as t

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslator
from boltons.urlutils import URL
from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand All @@ -25,17 +30,47 @@ class MongoDBCDCRelayCrateDB:

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "ignore",
debug: bool = True,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)
self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

# Decode database URL: MongoDB.
self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri)

# Decode database URL: CrateDB.
self.cratedb_address = DatabaseAddress(self.cratedb_uri)
self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode()
cratedb_table = self.cratedb_table_address.fullname

self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

# Transformation machinery.
transformation = None
if tm:
address = CollectionAddress(
container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name
)
try:
transformation = tm.project.get(address=address)
logger.info(f"Applying transformation to: {address}")
except KeyError:
logger.warning(f"No transformation found for: {address}")
self.converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)

self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter)

self.on_error = on_error
self.debug = debug

def start(self):
"""
Expand All @@ -52,10 +87,10 @@ def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop.
# Note that `.subscribe()` (calling `.watch()`) will block until events are ready
# for consumption, so this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
with self.mongodb_adapter.subscribe() as change_stream:
for change in change_stream:
yield self.cdc.to_sql(change)
7 changes: 7 additions & 0 deletions doc/io/mongodb/cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-c
```


## Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option.


## Appendix
A few operations that are handy when exploring this exercise.

Expand Down Expand Up @@ -154,3 +160,4 @@ mongosh "${MONGODB_URL}" --eval 'db.demo.drop()'
[MongoDB Atlas]: https://www.mongodb.com/atlas
[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/
[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81
[Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/
4 changes: 2 additions & 2 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.

## Zyp Transformations
## Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option.
Expand Down Expand Up @@ -244,5 +244,5 @@ recent versions like MongoDB 7 and tools version 100.9.5 or higher.
[libbson test files]: https://github.com/mongodb/mongo-c-driver/tree/master/src/libbson/tests/json
[MongoDB Extended JSON]: https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
[mongodb-json-files]: https://github.com/ozlerhakan/mongodb-json-files
[Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/index.html
[Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/
[zyp-mongodb-json-files.yaml]: https://github.com/crate/cratedb-toolkit/blob/v0.0.22/examples/zyp/zyp-mongodb-json-files.yaml