Skip to content

Commit

Permalink
DBZ-8346 Use DebeziumSinkRecord instead of Kafka Connect's SinkRecord…
Browse files Browse the repository at this point in the history
… inside Debezium sink connectors

- pt.2 Switching to DebeziumSinkRecord instead of SinkRecord
  • Loading branch information
rk3rn3r authored and jpechane committed Nov 14, 2024
1 parent 6e4d6be commit ee60553
Show file tree
Hide file tree
Showing 59 changed files with 1,046 additions and 998 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
public interface Buffer {

/**
* to add a {@link SinkRecordDescriptor} to the internal buffer and
* to add a {@link JdbcSinkRecord} to the internal buffer and
* call the {@link Buffer#flush()} when buffer size >= {@link JdbcSinkConnectorConfig#getBatchSize()}
* @param recordDescriptor the Sink record descriptor
* @param record the Debezium sink record
* @return the buffer records
*/
List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor);
List<JdbcSinkRecord> add(JdbcSinkRecord record);

/**
* to clear and flush the internal buffer
* @return {@link SinkRecordDescriptor} the flushed buffer records.
* @return {@link JdbcSinkRecord} the flushed buffer records.
*/
List<SinkRecordDescriptor> flush();
List<JdbcSinkRecord> flush();

/**
* to check whether buffer is empty or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.connector.jdbc;

import static io.debezium.connector.jdbc.JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE;
import static io.debezium.connector.jdbc.JdbcSinkRecord.FieldDescriptor;

import java.sql.SQLException;
import java.time.Duration;
Expand All @@ -27,15 +28,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.metadata.CollectionId;
import io.debezium.sink.DebeziumSinkRecord;
import io.debezium.sink.spi.ChangeEventSink;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;

/**
* A {@link ChangeEventSink} for a JDBC relational database.
Expand All @@ -48,6 +48,7 @@ public class JdbcChangeEventSink implements ChangeEventSink {

public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";

private final JdbcSinkConnectorConfig config;
private final DatabaseDialect dialect;
private final StatelessSession session;
Expand All @@ -68,39 +69,36 @@ public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession sess
LOGGER.info("Database version {}.{}.{}", version.getMajor(), version.getMinor(), version.getMicro());
}

@Override
public void execute(Collection<SinkRecord> records) {
final Map<CollectionId, Buffer> upsertBufferByTable = new LinkedHashMap<>();
final Map<CollectionId, Buffer> deleteBufferByTable = new LinkedHashMap<>();

for (SinkRecord record : records) {
for (SinkRecord kafkaSinkRecord : records) {

JdbcSinkRecord record = new JdbcKafkaSinkRecord(kafkaSinkRecord, config.getPrimaryKeyMode(), config.getPrimaryKeyFields(), config.getFieldFilter(), dialect);
LOGGER.trace("Processing {}", record);

validate(record);

Optional<CollectionId> optionalCollectionId = getCollectionIdFromRecord(record);
if (optionalCollectionId.isEmpty()) {

LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", record.topic(), record.kafkaPartition(),
record.kafkaOffset());
LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", record.topicName(), record.partition(),
record.offset());
continue;
}

SinkRecordDescriptor sinkRecordDescriptor = buildRecordSinkDescriptor(record);

if (sinkRecordDescriptor.isTombstone()) {
if (record.isTombstone()) {
// Skip only Debezium Envelope tombstone not the one produced by ExtractNewRecordState SMT
LOGGER.debug("Skipping tombstone record {}", sinkRecordDescriptor);
LOGGER.debug("Skipping tombstone record {}", record);
continue;
}

final CollectionId collectionId = optionalCollectionId.get();

if (sinkRecordDescriptor.isTruncate()) {

if (record.isTruncate()) {
if (!config.isTruncateEnabled()) {
LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", sinkRecordDescriptor.getTopicName());
LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", record.topicName());
continue;
}

Expand All @@ -109,7 +107,7 @@ public void execute(Collection<SinkRecord> records) {
flushBuffers(deleteBufferByTable);

try {
final TableDescriptor table = checkAndApplyTableChangesIfNeeded(collectionId, sinkRecordDescriptor);
final TableDescriptor table = checkAndApplyTableChangesIfNeeded(collectionId, record);
writeTruncate(dialect.getTruncateStatement(table));
continue;
}
Expand All @@ -118,10 +116,9 @@ public void execute(Collection<SinkRecord> records) {
}
}

if (sinkRecordDescriptor.isDelete()) {

if (record.isDelete()) {
if (!config.isDeleteEnabled()) {
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", record.topicName());
continue;
}

Expand All @@ -132,10 +129,8 @@ public void execute(Collection<SinkRecord> records) {
flushBufferWithRetries(collectionId, upsertBufferByTable.get(collectionId).flush());
}

Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, collectionId, sinkRecordDescriptor);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);

Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, collectionId, record);
List<JdbcSinkRecord> toFlush = tableIdBuffer.add(record);
flushBufferWithRetries(collectionId, toFlush);
}
else {
Expand All @@ -148,9 +143,9 @@ public void execute(Collection<SinkRecord> records) {
Stopwatch updateBufferStopwatch = Stopwatch.reusable();
updateBufferStopwatch.start();

Buffer tableIdBuffer = resolveBuffer(upsertBufferByTable, collectionId, sinkRecordDescriptor);
Buffer tableIdBuffer = resolveBuffer(upsertBufferByTable, collectionId, record);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
List<JdbcSinkRecord> toFlush = tableIdBuffer.add(record);
updateBufferStopwatch.stop();

LOGGER.trace("[PERF] Update buffer execution time {}", updateBufferStopwatch.durations());
Expand All @@ -163,52 +158,27 @@ public void execute(Collection<SinkRecord> records) {
flushBuffers(deleteBufferByTable);
}

private void validate(SinkRecord record) {

if (isSchemaChange(record)) {
private void validate(JdbcSinkRecord record) {
if (record.isSchemaChange()) {
LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
}
}

private static boolean isSchemaChange(SinkRecord record) {
return record.valueSchema() != null
&& !Strings.isNullOrEmpty(record.valueSchema().name())
&& record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
}

private Buffer resolveBuffer(Map<CollectionId, Buffer> bufferMap, CollectionId collectionId, SinkRecordDescriptor sinkRecordDescriptor) {
if (config.isUseReductionBuffer() && !sinkRecordDescriptor.getKeyFieldNames().isEmpty()) {
private Buffer resolveBuffer(Map<CollectionId, Buffer> bufferMap, CollectionId collectionId, JdbcSinkRecord record) {
if (config.isUseReductionBuffer() && !record.keyFieldNames().isEmpty()) {
return bufferMap.computeIfAbsent(collectionId, k -> new ReducedRecordBuffer(config));
}
else {
return bufferMap.computeIfAbsent(collectionId, k -> new RecordBuffer(config));
}
}

private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord record) {

SinkRecordDescriptor sinkRecordDescriptor;
try {
sinkRecordDescriptor = SinkRecordDescriptor.builder()
.withPrimaryKeyMode(config.getPrimaryKeyMode())
.withPrimaryKeyFields(config.getPrimaryKeyFields())
.withFieldFilter(config.getFieldFilter())
.withSinkRecord(record)
.withDialect(dialect)
.build();
}
catch (Exception e) {
throw new ConnectException("Failed to process a sink record", e);
}
return sinkRecordDescriptor;
}

private void flushBuffers(Map<CollectionId, Buffer> bufferByTable) {
bufferByTable.forEach((collectionId, recordBuffer) -> flushBufferWithRetries(collectionId, recordBuffer.flush()));
}

private void flushBufferWithRetries(CollectionId collectionId, List<SinkRecordDescriptor> toFlush) {
private void flushBufferWithRetries(CollectionId collectionId, List<JdbcSinkRecord> toFlush) {
int retries = 0;
Exception lastException = null;

Expand Down Expand Up @@ -241,7 +211,7 @@ private void flushBufferWithRetries(CollectionId collectionId, List<SinkRecordDe
throw new ConnectException("Exceeded max retries " + flushMaxRetries + " times, failed to process sink records", lastException);
}

private void flushBuffer(CollectionId collectionId, List<SinkRecordDescriptor> toFlush) throws SQLException {
private void flushBuffer(CollectionId collectionId, List<JdbcSinkRecord> toFlush) throws SQLException {
Stopwatch flushBufferStopwatch = Stopwatch.reusable();
Stopwatch tableChangesStopwatch = Stopwatch.reusable();
if (!toFlush.isEmpty()) {
Expand Down Expand Up @@ -270,17 +240,17 @@ public void close() {
}
}

private TableDescriptor checkAndApplyTableChangesIfNeeded(CollectionId collectionId, SinkRecordDescriptor descriptor) throws SQLException {
private TableDescriptor checkAndApplyTableChangesIfNeeded(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
if (!hasTable(collectionId)) {
// Table does not exist, lets attempt to create it.
try {
return createTable(collectionId, descriptor);
return createTable(collectionId, record);
}
catch (SQLException ce) {
// It's possible the table may have been created in the interim, so try to alter.
LOGGER.warn("Table creation failed for '{}', attempting to alter the table", collectionId.toFullIdentiferString(), ce);
try {
return alterTableIfNeeded(collectionId, descriptor);
return alterTableIfNeeded(collectionId, record);
}
catch (SQLException ae) {
// The alter failed, hard stop.
Expand All @@ -292,7 +262,7 @@ private TableDescriptor checkAndApplyTableChangesIfNeeded(CollectionId collectio
else {
// Table exists, lets attempt to alter it if necessary.
try {
return alterTableIfNeeded(collectionId, descriptor);
return alterTableIfNeeded(collectionId, record);
}
catch (SQLException ae) {
LOGGER.error("Failed to alter the table '{}'.", collectionId.toFullIdentiferString(), ae);
Expand All @@ -309,7 +279,7 @@ private TableDescriptor readTable(CollectionId collectionId) {
return session.doReturningWork((connection) -> dialect.readTable(connection, collectionId));
}

private TableDescriptor createTable(CollectionId collectionId, SinkRecordDescriptor record) throws SQLException {
private TableDescriptor createTable(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
LOGGER.debug("Attempting to create table '{}'.", collectionId.toFullIdentiferString());

if (NONE.equals(config.getSchemaEvolutionMode())) {
Expand All @@ -332,7 +302,7 @@ private TableDescriptor createTable(CollectionId collectionId, SinkRecordDescrip
return readTable(collectionId);
}

private TableDescriptor alterTableIfNeeded(CollectionId collectionId, SinkRecordDescriptor record) throws SQLException {
private TableDescriptor alterTableIfNeeded(CollectionId collectionId, JdbcSinkRecord record) throws SQLException {
LOGGER.debug("Attempting to alter table '{}'.", collectionId.toFullIdentiferString());

if (!hasTable(collectionId)) {
Expand All @@ -353,7 +323,7 @@ private TableDescriptor alterTableIfNeeded(CollectionId collectionId, SinkRecord

LOGGER.debug("The follow fields are missing in the table: {}", missingFields);
for (String missingFieldName : missingFields) {
final FieldDescriptor fieldDescriptor = record.getFields().get(missingFieldName);
final FieldDescriptor fieldDescriptor = record.allFields().get(missingFieldName);
if (!fieldDescriptor.getSchema().isOptional() && fieldDescriptor.getSchema().defaultValue() == null) {
throw new SQLException(String.format(
"Cannot ALTER table '%s' because field '%s' is not optional but has no default value",
Expand Down Expand Up @@ -381,14 +351,13 @@ private TableDescriptor alterTableIfNeeded(CollectionId collectionId, SinkRecord
return readTable(collectionId);
}

private String getSqlStatement(TableDescriptor table, SinkRecordDescriptor record) {

private String getSqlStatement(TableDescriptor table, JdbcSinkRecord record) {
if (!record.isDelete()) {
switch (config.getInsertMode()) {
case INSERT:
return dialect.getInsertStatement(table, record);
case UPSERT:
if (record.getKeyFieldNames().isEmpty()) {
if (record.keyFieldNames().isEmpty()) {
throw new ConnectException("Cannot write to table " + table.getId().name() + " with no key fields defined.");
}
return dialect.getUpsertStatement(table, record);
Expand Down Expand Up @@ -434,8 +403,8 @@ private boolean isRetriable(Throwable throwable) {
return isRetriable(throwable.getCause());
}

public Optional<CollectionId> getCollectionIdFromRecord(SinkRecord record) {
String tableName = this.config.getCollectionNamingStrategy().resolveCollectionName(config, record);
public Optional<CollectionId> getCollectionIdFromRecord(DebeziumSinkRecord record) {
String tableName = this.config.getCollectionNamingStrategy().resolveCollectionName(record, config.getCollectionNameFormat());
if (tableName == null) {
return Optional.empty();
}
Expand Down
Loading

0 comments on commit ee60553

Please sign in to comment.