diff --git a/CHANGES.md b/CHANGES.md index 775acb2..7e60d75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Changed `UPDATE` statements from DMS not to write the entire `data` + column. This allows defining primary keys on the sink table. ## 2024/08/16 v0.0.5 - Changed `UPDATE` statements from DynamoDB not to write the entire `data` diff --git a/src/commons_codec/transform/aws_dms.py b/src/commons_codec/transform/aws_dms.py index 05d63f0..97d55a9 100644 --- a/src/commons_codec/transform/aws_dms.py +++ b/src/commons_codec/transform/aws_dms.py @@ -79,9 +79,9 @@ def to_sql(self) -> str: sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" elif self.operation == "update": - values_clause = self.record_to_values() + values_clause = self.record_to_update() where_clause = self.keys_to_where() - sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + sql = f"UPDATE {self.address.fqn} SET {values_clause} WHERE {where_clause};" elif self.operation == "delete": where_clause = self.keys_to_where() @@ -94,6 +94,28 @@ def to_sql(self) -> str: return sql + def record_to_update(self) -> str: + """ + Serializes an image to a comma-separated list of column/values pairs + that can be used in the `SET` clause of an `UPDATE` statement. + Primary key columns are skipped, since they cannot be updated. + + IN + {'age': 33, 'attributes': '{"foo": "bar"}', 'id': 42, 'name': 'John'} + + OUT + data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John' + """ + constraints: t.List[str] = [] + for column_name, column_value in self.record["data"].items(): + # Skip primary key columns, they cannot be updated + if column_name in self.primary_keys: + continue + + constraint = f"{self.DATA_COLUMN}['{column_name}'] = '{column_value}'" + constraints.append(constraint) + return ", ".join(constraints) + def record_to_values(self) -> str: """ Apply type translations to record, and serialize to JSON. diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index ddf825e..cdec25c 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -237,7 +237,7 @@ def test_decode_cdc_update_success(cdc): # Emulate an UPDATE operation. assert ( cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" ' - f"SET data = '{json.dumps(RECORD_UPDATE)}' " + "SET data['age'] = '33', data['attributes'] = '{\"foo\": \"bar\"}', data['name'] = 'John' " "WHERE data['id'] = '42';" )