Skip to content

Commit

Permalink
DynamoDB: Change CrateDB data model to use (pk, data, aux) columns
Browse files Browse the repository at this point in the history
By breaking the primary key information out of the main record's data
bucket, the main record can be updated as-is on CDC MODIFY operations.
  • Loading branch information
amotl committed Sep 30, 2024
1 parent d3c7cfe commit c4e36b9
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

## Unreleased
- MongoDB: Updated to pymongo 4.9
- DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns
Attention: This is a breaking change.

## 2024/09/26 v0.0.26
- MongoDB: Configure `MongoDBCrateDBConverter` after updating to commons-codec 0.0.18
Expand Down
16 changes: 16 additions & 0 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing as t

import boto3
from commons_codec.transform.dynamodb_model import PrimaryKeySchema
from yarl import URL

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,3 +56,18 @@ def scan(
def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
return table.item_count

def primary_key_schema(self, table_name: str) -> PrimaryKeySchema:
"""
Return primary key information for given table, derived from `KeySchema` [1] and `AttributeDefinition` [2].
AttributeType:
- S - the attribute is of type String
- N - the attribute is of type Number
- B - the attribute is of type Binary
[1] https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html#DDB-CreateTable-request-KeySchema
[2] https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeDefinition.html
"""
table = self.dynamodb_resource.Table(table_name)
return PrimaryKeySchema.from_table(table)
10 changes: 6 additions & 4 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def __init__(
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.on_error = on_error
self.progress = progress
Expand All @@ -54,12 +53,15 @@ def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
records_in = self.dynamodb_adapter.count_records(table_name=self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")

primary_key_schema = self.dynamodb_adapter.primary_key_schema(table_name=self.dynamodb_table)
translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table, primary_key_schema=primary_key_schema)

with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.execute(sa.text(translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
Expand All @@ -68,7 +70,7 @@ def start(self):
processor = BulkProcessor(
connection=connection,
data=self.fetch(),
batch_to_operation=self.translator.to_sql,
batch_to_operation=translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "commons-codec>=0.0.19",
# "commons-codec>=0.0.20",
# "sqlalchemy-cratedb==0.38.0",
# ]
# ///
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec>=0.0.19",
"commons-codec>=0.0.20",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -163,11 +163,11 @@ io = [
kinesis = [
"aiobotocore<2.16",
"async-kinesis<1.2",
"commons-codec>=0.0.19",
"commons-codec>=0.0.20",
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.19",
"commons-codec[mongodb,zyp]>=0.0.20",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<4.10,>=3.10.1",
Expand Down
19 changes: 13 additions & 6 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_ma

data_in = {
"Id": {"N": "101"},
"Name": {"S": "Hotzenplotz"},
}
data_out = {
"Id": 101.0,
record_out = {
"pk": {
"Id": 101.0,
},
"data": {
"Name": "Hotzenplotz",
},
"aux": {},
}

# Define source and target URLs.
Expand All @@ -34,7 +41,7 @@ def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_ma
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == data_out
assert results[0] == record_out


def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager):
Expand All @@ -48,8 +55,8 @@ def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_ma
{"Id": {"N": "3"}, "name": {"S": "Baz"}},
]
data_out = [
{"data": {"Id": 1, "name": "Foo"}, "aux": {}},
{"data": {"Id": 3, "name": "Baz"}, "aux": {}},
{"pk": {"Id": 1}, "data": {"name": "Foo"}, "aux": {}},
{"pk": {"Id": 3}, "data": {"name": "Baz"}, "aux": {}},
]

# Define source and target URLs.
Expand All @@ -68,7 +75,7 @@ def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_ma
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 2

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY pk['Id'];", records=True) # noqa: S608
assert results == data_out

assert "Dynamic nested arrays are not supported" in caplog.text
7 changes: 5 additions & 2 deletions tests/io/dynamodb/test_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time

import pytest
from commons_codec.transform.dynamodb_model import PrimaryKeySchema

from cratedb_toolkit.io.kinesis.relay import KinesisRelay
from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis
Expand Down Expand Up @@ -33,7 +34,8 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)
translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S"))
cratedb.database.run_sql(translator.sql_ddl)

# Define two CDC events: INSERT and UPDATE.
events = [
Expand Down Expand Up @@ -76,7 +78,8 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)
translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S"))
cratedb.database.run_sql(translator.sql_ddl)

# Define two CDC events: INSERT and UPDATE.
events = [
Expand Down
4 changes: 3 additions & 1 deletion tests/io/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
pytest.importorskip("commons_codec", reason="Only works with commons-codec installed")

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402
from commons_codec.transform.dynamodb_model import PrimaryKeySchema # noqa: E402

DYNAMODB_CDC_INSERT_NESTED = {
"awsRegion": "us-east-1",
Expand Down Expand Up @@ -122,7 +123,8 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)
translator = DynamoDBCDCTranslator(table_name=table_name, primary_key_schema=PrimaryKeySchema().add("id", "S"))
cratedb.database.run_sql(translator.sql_ddl)

# Configure Lambda processor per environment variables.
handler_environment = {
Expand Down

0 comments on commit c4e36b9

Please sign in to comment.