Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

DBZ-8018 Use all record fields as primary key columns when no primary key fields are specified #89

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -396,30 +397,25 @@ private void applyRecordHeaderAsPrimaryKey(SinkRecord record) {
}

private void applyRecordValueAsPrimaryKey(SinkRecord record, boolean flattened) {
if (primaryKeyFields.isEmpty()) {
throw new ConnectException("At least one " + JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS +
" field name should be specified when resolving keys from the record's value.");
}

final Schema valueSchema = record.valueSchema();
if (valueSchema == null) {
throw new ConnectException("Configured primary key mode 'record_value' cannot have null schema");
}
else if (flattened) {
for (Field field : record.valueSchema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
}

Stream<Field> recordFields;
if (flattened) {
recordFields = record.valueSchema().fields().stream();
}
else {
final Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
for (Field field : after.schema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
}
recordFields = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER).schema().fields().stream();
}

if (!primaryKeyFields.isEmpty()) {
recordFields = recordFields.filter(field -> primaryKeyFields.contains(field.name()));
}

recordFields.forEach(field -> addKeyField(record.topic(), field));
}

private void applyPrimitiveRecordKeyAsPrimaryKey(Schema keySchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,19 @@ public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValueWithNoFie

final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
try {
consume(factory.createRecord(topicName));
stopSinkConnector();
}
catch (Exception e) {
assertThat(e.getCause().getCause().getMessage()).contains("At least one primary.key.fields field name should be specified");
}
final SinkRecord createRecord = factory.createRecordNoKey(topicName);
consume(createRecord);

final String destinationTableName = destinationTableName(createRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName);
tableAssert.exists().hasNumberOfColumns(3);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe");
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$");

assertHasPrimaryKeyColumns(destinationTableName, "id", "name", "nick_name$");
}

@ParameterizedTest
Expand Down Expand Up @@ -396,6 +402,7 @@ protected void assertHasPrimaryKeyColumns(String tableName, boolean caseInsensit
}
else if (caseInsensitive) {
pkColumnNames = pkColumnNames.stream().map(String::toLowerCase).collect(Collectors.toList());
assertThat(pkColumnNames.size()).isEqualTo(columnNames.length);
for (int columnIndex = 0; columnIndex < columnNames.length; ++columnIndex) {
assertThat(pkColumnNames).contains(columnNames[columnIndex].toLowerCase(), Index.atIndex(columnIndex));
}
Expand Down