Skip to content

Commit

Permalink
DBZ-7856 Remove signal offset support for KafkaSignalChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Dec 5, 2024
1 parent 3166ffb commit 6171337
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
Expand Down Expand Up @@ -142,13 +141,11 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, Sna
throws InterruptedException {
final Map<String, Object> additionalData = signalPayload.additionalData;
super.addDataCollectionNamesToSnapshot(signalPayload, snapshotConfiguration);
getContext().setSignalOffset((Long) additionalData.get(KafkaSignalChannel.CHANNEL_OFFSET));
}

@Override
public void requestStopSnapshot(P partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionIds) {
super.requestStopSnapshot(partition, offsetContext, additionalData, dataCollectionIds);
getContext().setSignalOffset((Long) additionalData.get(KafkaSignalChannel.CHANNEL_OFFSET));
}

private BinlogReadOnlyIncrementalSnapshotContext<TableId> getContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,12 @@
*/
public abstract class BinlogReadOnlyIncrementalSnapshotContext<T> extends AbstractIncrementalSnapshotContext<T> {

public static final String SIGNAL_OFFSET = INCREMENTAL_SNAPSHOT_KEY + "_signal_offset";

private Long signalOffset;

public BinlogReadOnlyIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
super(useCatalogBeforeSchema);
}

public Long getSignalOffset() {
return signalOffset;
}

public void setSignalOffset(Long signalOffset) {
this.signalOffset = signalOffset;
}

public Map<String, Object> store(Map<String, Object> offset) {
Map<String, Object> snapshotOffset = super.store(offset);
snapshotOffset.put(SIGNAL_OFFSET, signalOffset);
return snapshotOffset;
return super.store(offset);
}

public String getCurrentGtid(OffsetContext offsetContext) {
Expand Down Expand Up @@ -100,8 +86,7 @@ public String getCurrentGtid(OffsetContext offsetContext) {
protected static <U> IncrementalSnapshotContext<U> init(BinlogReadOnlyIncrementalSnapshotContext<U> context,
Map<String, ?> offsets) {
AbstractIncrementalSnapshotContext.init(context, offsets);
final Long signalOffset = (Long) offsets.get(SIGNAL_OFFSET);
context.setSignalOffset(signalOffset);

return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.spi.snapshot.Snapshotter;
Expand Down Expand Up @@ -77,33 +75,6 @@ protected void validateBinlogConfiguration(Snapshotter snapshotter, BinlogConnec
}
}

/**
* Reset the specified offset.
*
* @param connectorConfig the connector configuration, should not be null
* @param previousOffset the previous offsets
* @param signalProcessor the signal processor, should not be null
*/
protected void resetOffset(BinlogConnectorConfig connectorConfig, O previousOffset, SignalProcessor<P, O> signalProcessor) {
boolean isKafkaChannelEnabled = connectorConfig.getEnabledChannels().contains(KafkaSignalChannel.CHANNEL_NAME);
if (previousOffset != null && isKafkaChannelEnabled && connectorConfig.isReadOnlyConnection()) {
KafkaSignalChannel kafkaSignal = signalProcessor.getSignalChannel(KafkaSignalChannel.class);
Long signalOffset = getReadOnlyIncrementalSnapshotSignalOffset(previousOffset);
if (signalOffset != null) {
LOGGER.info("Resetting Kafka Signal offset to {}", signalOffset);
kafkaSignal.reset(signalOffset);
}
}
}

/**
* Obtain the read-only incremental snapshot signal offset.
*
* @param previousOffset the previous offsets
* @return the read-only incremental snapshot signal offset
*/
protected abstract Long getReadOnlyIncrementalSnapshotSignalOffset(O previousOffset);

/**
* Common heartbeat error handler for binlog-based connectors.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,6 @@ public void givenOffsetCommitDisabledAndASignalSentWithConnectorRunning_whenConn
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
}

@Test
public void givenOffsetCommitDisabledAndASignalSentWithConnectorDown_whenConnectorComesBackUp_thenNoSignalsProcessed()
throws ExecutionException, InterruptedException {

final String signalTopic = "signals_topic-2";
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
sendExecuteSnapshotKafkaSignal("b", signalTopic);
Thread.sleep(5000);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, signalTopic)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);

final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.allRecordsInOrder()).hasSize(0);
assertThat(logInterceptor.countOccurrences("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isEqualTo(0);
}

@Test
public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed()
throws ExecutionException, InterruptedException {
Expand All @@ -146,7 +127,6 @@ public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConne
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, signalTopic)
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
sendExecuteSnapshotKafkaSignal("b", signalTopic);
Expand All @@ -164,7 +144,6 @@ public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_w
sendExecuteSnapshotKafkaSignal("b", signalTopic);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, signalTopic)
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
Expand All @@ -174,7 +153,6 @@ public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_w
sendExecuteSnapshotKafkaSignal("c", signalTopic);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, signalTopic)
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));

assertConnectorIsRunning();
Expand All @@ -194,7 +172,6 @@ public void givenOffsetCommitEnabledAndASignalSentWithConnectorNotRunning_whenCo
sendExecuteSnapshotKafkaSignal("b", signalTopic);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, signalTopic)
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ protected ChangeEventSourceCoordinator<MariaDbPartition, MariaDbOffsetContext> s
getAvailableSignalChannels(),
DocumentReader.defaultReader(),
previousOffsets);
resetOffset(connectorConfig, previousOffset, signalProcessor);

final Configuration heartbeatConfig = config;
final EventDispatcher<MariaDbPartition, TableId> dispatcher = new EventDispatcher<>(
Expand Down Expand Up @@ -272,12 +271,6 @@ protected List<SourceRecord> doPoll() throws InterruptedException {
return records.stream().map(DataChangeEvent::getRecord).collect(Collectors.toList());
}

@Override
@SuppressWarnings("unchecked")
protected Long getReadOnlyIncrementalSnapshotSignalOffset(MariaDbOffsetContext previousOffset) {
return ((MariaDbReadOnlyIncrementalSnapshotContext<TableId>) previousOffset.getIncrementalSnapshotContext()).getSignalOffset();
}

private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig connectorConfig) {
return new MariaDbValueConverters(
connectorConfig.getDecimalMode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
getAvailableSignalChannels(),
DocumentReader.defaultReader(),
previousOffsets);
resetOffset(connectorConfig, previousOffset, signalProcessor);

final Configuration heartbeatConfig = config;
final EventDispatcher<MySqlPartition, TableId> dispatcher = new EventDispatcher<>(
Expand Down Expand Up @@ -273,9 +272,4 @@ protected Iterable<Field> getAllConfigurationFields() {
return MySqlConnectorConfig.ALL_FIELDS;
}

@Override
@SuppressWarnings("unchecked")
protected Long getReadOnlyIncrementalSnapshotSignalOffset(MySqlOffsetContext previousOffset) {
return ((MySqlReadOnlyIncrementalSnapshotContext<TableId>) previousOffset.getIncrementalSnapshotContext()).getSignalOffset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ public class KafkaSignalChannel implements SignalChannelReader {
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Consumer group id for the signal topic")
.withDefault("kafka-signal");
public static final Field SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.consumer.offset.commit.enabled")
.withDisplayName("Enable offset commit for the signal topic")
.withType(ConfigDef.Type.BOOLEAN)
.withDefault(false)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Enable the offset commit for the signal topic in order to guarantee At-Least-Once delivery." +
"If disabled, only signals received when the consumer is up&running are processed. Any signals received when the consumer is down are lost.")
.withValidation(Field::isRequired);

private Optional<SignalRecord> processSignal(ConsumerRecord<String, String> record) {

Expand Down Expand Up @@ -164,12 +155,6 @@ public void init(CommonConnectorConfig connectorConfig) {
signalsConsumer.assign(Collect.arrayListOf(new TopicPartition(topicName, 0)));
}

@Override
public void reset(Object reference) {

signalsConsumer.seek(new TopicPartition(topicName, 0), ((Long) reference) + 1);
}

private static Configuration buildKafkaConfiguration(Configuration signalConfig) {

Configuration.Builder confBuilder = signalConfig.subset(CONSUMER_PREFIX, true).edit()
Expand All @@ -180,11 +165,10 @@ private static Configuration buildKafkaConfiguration(Configuration signalConfig)
.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.withDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) // readjusted since 0.10.1.0
.withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (signalConfig.getBoolean(SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED)) {
confBuilder.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.withDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
.withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.withDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return confBuilder
.build();
}
Expand Down

0 comments on commit 6171337

Please sign in to comment.