Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Change CrateDB data model to use (pk, data, aux) columns #64

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- DynamoDB: Change CrateDB data model to use (`pk`, `data`, `aux`) columns
Attention: This is a breaking change.

## 2024/09/26 v0.0.19
- DynamoDB CDC: Fix `MODIFY` operation by propagating `NewImage` fully
Expand Down
7 changes: 4 additions & 3 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,15 @@ def to_sql(self):


@define
class DualRecord:
class UniversalRecord:
"""
Manage two halves of a record.
Manage a universal record including primary keys and two halves of a record.
One bucket stores the typed fields, the other stores the untyped ones.
"""

pk: t.Dict[str, t.Any]
typed: t.Dict[str, t.Any]
untyped: t.Dict[str, t.Any]

def to_dict(self):
return {"typed": self.typed, "untyped": self.untyped}
return {"pk": self.pk, "typed": self.typed, "untyped": self.untyped}
101 changes: 57 additions & 44 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import (
DualRecord,
SQLOperation,
SQLParameterizedWhereClause,
UniversalRecord,
)
from commons_codec.transform.dynamodb_model import PrimaryKeySchema
from commons_codec.util.data import TaggableList
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

Expand Down Expand Up @@ -73,6 +73,10 @@ class DynamoTranslatorBase:
Translate DynamoDB records into a different representation.
"""

# Define name of the column where KeySchema DynamoDB fields will get materialized into.
# This column uses the `OBJECT(DYNAMIC)` data type.
PK_COLUMN = "pk"

# Define name of the column where typed DynamoDB fields will get materialized into.
# This column uses the `OBJECT(DYNAMIC)` data type.
TYPED_COLUMN = "data"
Expand All @@ -81,22 +85,27 @@ class DynamoTranslatorBase:
# This column uses the `OBJECT(IGNORED)` data type.
UNTYPED_COLUMN = "aux"

def __init__(self, table_name: str):
def __init__(self, table_name: str, primary_key_schema: PrimaryKeySchema = None):
super().__init__()
self.table_name = quote_relation_name(table_name)
self.primary_key_schema = primary_key_schema
self.deserializer = CrateDBTypeDeserializer()

@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
if self.primary_key_schema is None:
raise IOError("Unable to generate SQL DDL without key schema information")
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} "
f"({self.TYPED_COLUMN} OBJECT(DYNAMIC), {self.UNTYPED_COLUMN} OBJECT(IGNORED));"
f"CREATE TABLE IF NOT EXISTS {self.table_name} ("
f"{self.PK_COLUMN} OBJECT(STRICT) AS ({', '.join(self.primary_key_schema.to_sql_ddl_clauses())}), "
f"{self.TYPED_COLUMN} OBJECT(DYNAMIC), "
f"{self.UNTYPED_COLUMN} OBJECT(IGNORED));"
)

def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:
def decode_record(self, item: t.Dict[str, t.Any], key_names: t.Union[t.List[str], None] = None) -> UniversalRecord:
"""
Deserialize DynamoDB JSON record into vanilla Python.

Expand Down Expand Up @@ -124,20 +133,37 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:
-- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors
"""
record = toolz.valmap(self.deserializer.deserialize, item)

pk = {}
untyped = {}
pk_names = key_names or []
if not pk_names and self.primary_key_schema is not None:
pk_names = self.primary_key_schema.keys()
for key, value in record.items():
if key in pk_names:
pk[key] = value
if isinstance(value, TaggableList) and value.get_tag("varied", False):
untyped[key] = value
record = toolz.dissoc(record, *pk.keys())
record = toolz.dissoc(record, *untyped.keys())
return DualRecord(typed=record, untyped=untyped)
return UniversalRecord(pk=pk, typed=record, untyped=untyped)


class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation:
"""
Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s).
"""
sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
sql = (
f"INSERT INTO {self.table_name} ("
f"{self.PK_COLUMN}, "
f"{self.TYPED_COLUMN}, "
f"{self.UNTYPED_COLUMN}"
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
)
if not isinstance(data, list):
data = [data]
parameters = [self.decode_record(record).to_dict() for record in data]
Expand Down Expand Up @@ -166,56 +192,43 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
raise ValueError(f"Unknown eventSource: {event_source}")

if event_name == "INSERT":
dual_record = self.decode_record(event["dynamodb"]["NewImage"])
record = self.decode_event(event["dynamodb"])
sql = (
f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
f"INSERT INTO {self.table_name} ("
f"{self.PK_COLUMN}, "
f"{self.TYPED_COLUMN}, "
f"{self.UNTYPED_COLUMN}"
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
parameters = record.to_dict()

elif event_name == "MODIFY":
new_image = event["dynamodb"]["NewImage"]
# Drop primary key columns to not update them.
# Primary key values should be identical (if chosen identical in DynamoDB and CrateDB),
# but CrateDB does not allow having them in an UPDATE's SET clause.
for key in event["dynamodb"]["Keys"]:
del new_image[key]

dual_record = self.decode_record(event["dynamodb"]["NewImage"])

where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
record = self.decode_event(event["dynamodb"])
sql = (
f"UPDATE {self.table_name} "
f"SET {self.TYPED_COLUMN}=:typed, {self.UNTYPED_COLUMN}=:untyped "
f"WHERE {where_clause.to_sql()};"
f"WHERE {self.PK_COLUMN}=:pk;"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
parameters.update(where_clause.values)
parameters = record.to_dict()

elif event_name == "REMOVE":
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} WHERE {where_clause.to_sql()};"
parameters = where_clause.values # noqa: PD011
record = self.decode_event(event["dynamodb"])
sql = f"DELETE FROM {self.table_name} WHERE {self.PK_COLUMN}=:pk;"
parameters = record.to_dict()

else:
raise ValueError(f"Unknown CDC event name: {event_name}")

return SQLOperation(sql, parameters)

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.

IN (top-level stripped):
"Keys": {
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
def decode_event(self, event: t.Dict[str, t.Any]) -> UniversalRecord:
# That's for INSERT+MODIFY.
if "NewImage" in event:
return self.decode_record(event["NewImage"], event["Keys"].keys())

OUT:
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
"""
dual_record = self.decode_record(keys)
clause = SQLParameterizedWhereClause()
for key_name, key_value in dual_record.typed.items():
clause.add(lval=f"{self.TYPED_COLUMN}['{key_name}']", name=key_name, value=key_value)
return clause
# That's for REMOVE.
else:
return self.decode_record(event["Keys"], event["Keys"].keys())
82 changes: 82 additions & 0 deletions src/commons_codec/transform/dynamodb_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import sys
import typing as t

from attr import Factory, define

if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum # pragma: no cover


class AttributeType(StrEnum):
STRING = "STRING"
NUMBER = "NUMBER"
BINARY = "BINARY"


DYNAMODB_TYPE_MAP = {
"S": AttributeType.STRING,
"N": AttributeType.NUMBER,
"B": AttributeType.BINARY,
}

CRATEDB_TYPE_MAP = {
AttributeType.STRING: "STRING",
AttributeType.NUMBER: "BIGINT",
AttributeType.BINARY: "STRING",
}


@define
class Attribute:
name: str
type: AttributeType

@classmethod
def from_dynamodb(cls, name: str, type_: str):
try:
return cls(name=name, type=DYNAMODB_TYPE_MAP[type_])
except KeyError as ex:
raise KeyError(f"Mapping DynamoDB type failed: name={name}, type={type_}") from ex

@property
def cratedb_type(self):
return CRATEDB_TYPE_MAP[self.type]


@define
class PrimaryKeySchema:
schema: t.List[Attribute] = Factory(list)

def add(self, name: str, type: str) -> "PrimaryKeySchema": # noqa: A002
self.schema.append(Attribute.from_dynamodb(name, type))
return self

@classmethod
def from_table(cls, table) -> "PrimaryKeySchema":
"""
# attribute_definitions: [{'AttributeName': 'Id', 'AttributeType': 'N'}]
# key_schema: [{'AttributeName': 'Id', 'KeyType': 'HASH'}]
"""

schema = cls()
attribute_type_map: t.Dict[str, str] = {}
for attr in table.attribute_definitions:
attribute_type_map[attr["AttributeName"]] = attr["AttributeType"]

for key in table.key_schema:
name = key["AttributeName"]
type_ = attribute_type_map[name]
schema.add(name=name, type=type_)

return schema

def keys(self) -> t.List[str]:
return [attribute.name for attribute in self.schema]

def column_names(self) -> t.List[str]:
return [f'"{attribute.name}"' for attribute in self.schema]

def to_sql_ddl_clauses(self) -> t.List[str]:
return [f'"{attribute.name}" {attribute.cratedb_type} PRIMARY KEY' for attribute in self.schema]
13 changes: 13 additions & 0 deletions tests/transform/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pytest

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator, DynamoDBFullLoadTranslator
from commons_codec.transform.dynamodb_model import PrimaryKeySchema

RESET_TABLES = [
"from.dynamodb",
"from.mongodb",
Expand All @@ -13,3 +16,13 @@ def cratedb(cratedb_service):
"""
cratedb_service.reset(tables=RESET_TABLES)
yield cratedb_service


@pytest.fixture
def dynamodb_full_translator_foo():
return DynamoDBFullLoadTranslator(table_name="foo", primary_key_schema=PrimaryKeySchema().add("id", "S"))


@pytest.fixture
def dynamodb_cdc_translator_foo():
return DynamoDBCDCTranslator(table_name="foo")
Loading