Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-6655 Log error and throw a DataException on a schema change record
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and Naros committed Jul 26, 2023
1 parent acc239f commit c04f347
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -33,6 +34,8 @@
public class JdbcSinkConnectorTask extends SinkTask {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";

private enum State {
RUNNING,
Expand Down Expand Up @@ -86,6 +89,7 @@ public void put(Collection<SinkRecord> records) {
final SinkRecord record = iterator.next();
LOGGER.trace("Received {}", record);
try {
validate(record);
changeEventSink.execute(record);
markProcessed(record);
}
Expand All @@ -103,6 +107,18 @@ public void put(Collection<SinkRecord> records) {
}
}

private void validate(SinkRecord record) {

if (isSchemaChange(record)) {
LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
}
}

private static boolean isSchemaChange(SinkRecord record) {
return record.valueSchema() != null && !Strings.isNullOrEmpty(record.valueSchema().name()) && SCHEMA_CHANGE_VALUE.contains(record.valueSchema().name());
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// Flush only up to the records processed by this sink
Expand Down

0 comments on commit c04f347

Please sign in to comment.