Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -114,7 +115,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit,
createEventFilter(currentBinlogSplit.getStartingOffset()));
createEventFilter());

executorService.submit(
() -> {
Expand Down Expand Up @@ -306,17 +307,35 @@ private void configureFilter() {
this.pureBinlogPhaseTables.clear();
}

private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
private Predicate<Event> createEventFilter() {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on event to drop
// events earlier than the specified timestamp.
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
// Notes:
// 1. Heartbeat event doesn't contain timestamp, so we just keep it
// 2. Timestamp of event is in epoch millisecond
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;

// NOTE: Here we take user's configuration (statefulTaskContext.getSourceConfig())
// as the ground truth. This might be fragile if user changes the config and recover
// the job from savepoint / checkpoint, as there might be conflict between user's config
// and the state in savepoint / checkpoint. But as we don't promise compatibility of
// checkpoint after changing the config, this is acceptable for now.
StartupOptions startupOptions = statefulTaskContext.getSourceConfig().getStartupOptions();
if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) {
if (startupOptions.binlogOffset == null) {
throw new NullPointerException(
"The startup option was set to TIMESTAMP "
+ "but unable to find starting binlog offset. Please check if the timestamp is specified in "
+ "configuration. ");
}
long startTimestampSec = startupOptions.binlogOffset.getTimestampSec();
// We only skip data change event, as other kinds of events are necessary for updating
// some internal state inside MySqlStreamingChangeEventSource
LOG.info(
"Creating event filter that dropping row mutation events before timestamp in second {}",
startTimestampSec);
return event -> {
if (!EventType.isRowMutation(getEventType(event))) {
return true;
}
return event.getHeader().getTimestamp() >= startTimestampSec * 1000;
};
}
return event -> true;
}
Expand All @@ -327,8 +346,17 @@ public void stopBinlogReadTask() {
changeEventSourceContext.stopChangeEventSource();
}

private EventType getEventType(Event event) {
return event.getHeader().getEventType();
}

@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}

@VisibleForTesting
MySqlBinlogSplitReadTask getBinlogSplitReadTask() {
return binlogSplitReadTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.debezium.task;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.StoppableChangeEventSourceContext;
Expand Down Expand Up @@ -119,4 +120,9 @@ protected void handleEvent(
private boolean isBoundedRead() {
return !isNonStoppingOffset(binlogSplit.getEndingOffset());
}

@VisibleForTesting
public Predicate<Event> getEventFilter() {
return eventFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
import org.apache.flink.util.ExceptionUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
Expand Down Expand Up @@ -82,6 +86,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -841,6 +846,50 @@ public void testReadBinlogFromUnavailableBinlog() throws Exception {
}
}

@Test
public void testRestoreFromCheckpointWithTimestampStartingOffset() throws Exception {
// Preparations
inventoryDatabase8.createAndInitialize();
MySqlSourceConfig connectionConfig =
getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"});
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);

// Capture the current binlog offset, and use it to mock restoring from checkpoint
BinlogOffset checkpointOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection);

// Create a config to start reading from timestamp
long startTimestampMs = 15213L;
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL8_CONTAINER,
inventoryDatabase8,
StartupOptions.timestamp(startTimestampMs),
new String[] {"products"});

BinlogSplitReader binlogReader = createBinlogReader(sourceConfig);
MySqlBinlogSplit checkpointSplit =
createBinlogSplit(
getConfig(
MYSQL8_CONTAINER,
inventoryDatabase8,
StartupOptions.specificOffset(checkpointOffset),
new String[] {"products"}));

// Restore binlog reader from checkpoint
binlogReader.submitSplit(checkpointSplit);

// We mock a WRITE_ROWS event with timestamp = 1, which should be dropped by filter
EventHeaderV4 header = new EventHeaderV4();
header.setEventType(EventType.WRITE_ROWS);
header.setTimestamp(1L);
Event event = new Event(header, new WriteRowsEventData());

// Check if the filter works
Predicate<Event> eventFilter = binlogReader.getBinlogSplitReadTask().getEventFilter();
assertThat(eventFilter.test(event)).isFalse();
}

private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false);
}
Expand Down