Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,64 @@ public void testSchemaChangeEvents() throws Exception {
"DropTableEvent{tableId=%s.products}");
}

@Test
public void testDroppingTable() throws Exception {
Thread.sleep(5000);
LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events...");
long ddlTimestamp = System.currentTimeMillis();
Thread.sleep(5000);
LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);

try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE products;");
}

String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ " scan.startup.mode: timestamp\n"
+ " scan.startup.timestamp-millis: %d\n"
+ " scan.binlog.newly-added-table.enabled: true\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
ddlTimestamp,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUntilSpecificEvent(
String.format(
"Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.",
mysqlInventoryDatabase.getDatabaseName(),
mysqlInventoryDatabase.getDatabaseName()));

waitUntilSpecificEvent(
String.format(
"Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.",
mysqlInventoryDatabase.getDatabaseName()));
}

private void validateResult(String... expectedEvents) throws Exception {
String dbName = mysqlInventoryDatabase.getDatabaseName();
for (String event : expectedEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
Expand Down Expand Up @@ -242,7 +244,13 @@ private void processSchemaChangeEvents(SchemaChangeEvent event)
tableId,
event);
handleSchemaChangeEvent(tableId, event);
// Update caches

if (event instanceof DropTableEvent) {
// Update caches unless event is a Drop table event. In that case, no schema will be
// available / necessary.
return;
}

originalSchema.put(tableId, getLatestOriginalSchema(tableId));
schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));

Expand Down Expand Up @@ -440,7 +448,8 @@ private SchemaChangeResponse requestSchemaChange(
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
sendRequestToCoordinator(
new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));
if (response.isRegistryBusy()) {
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
LOG.info(
Expand Down Expand Up @@ -609,4 +618,10 @@ public Object getFieldOrNull(RecordData recordData) {
}
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement
// is guaranteed not to be mixed together.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -78,7 +77,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
/**
* Atomic flag indicating if current RequestHandler could accept more schema changes for now.
*/
private final AtomicReference<RequestStatus> schemaChangeStatus;
private volatile RequestStatus schemaChangeStatus;

private final List<Integer> pendingSubTaskIds;
private final Object schemaChangeRequestLock;

private volatile Throwable currentChangeException;
private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
Expand Down Expand Up @@ -110,7 +112,10 @@ public SchemaRegistryRequestHandler(
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
this.currentFinishedSchemaChanges = new ArrayList<>();
this.currentIgnoredSchemaChanges = new ArrayList<>();
this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);

this.schemaChangeStatus = RequestStatus.IDLE;
this.pendingSubTaskIds = new ArrayList<>();
this.schemaChangeRequestLock = new Object();
}

/**
Expand All @@ -120,67 +125,100 @@ public SchemaRegistryRequestHandler(
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();

// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
+ schemaChangeStatus.get());

// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
// change requests simultaneously from each subTask
int requestSubTaskId = request.getSubTaskId();

synchronized (schemaChangeRequestLock) {
// Make sure we handle the first request in the pending list to avoid out-of-order
// waiting and blocks checkpointing mechanism.
if (schemaChangeStatus == RequestStatus.IDLE) {
if (pendingSubTaskIds.isEmpty()) {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId);
} else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId);
pendingSubTaskIds.remove(0);
} else {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId,
pendingSubTaskIds);
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}

SchemaChangeEvent event = request.getSchemaChangeEvent();

// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
clearCurrentSchemaChangeRequest();
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());

// If this schema change event is filtered out by LENIENT mode or merging table
// route strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
clearCurrentSchemaChangeRequest();
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}

LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());

// If this schema change event is filtered out by LENIENT mode or merging table route
// strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
+ schemaChangeStatus.get());
"SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
// This request has been accepted.
schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;

// Backfill pre-schema info for sink applying
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe =
(SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
request,
requestSubTaskId,
pendingSubTaskIds);
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}

// Backfill pre-schema info for sink applying
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
}

Expand Down Expand Up @@ -227,9 +265,10 @@ private void applySchemaChange(
}
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
schemaChangeStatus == RequestStatus.APPLYING,
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus.get());
+ schemaChangeStatus);
schemaChangeStatus = RequestStatus.FINISHED;
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
currentDerivedSchemaChangeEvents);
Expand Down Expand Up @@ -262,10 +301,11 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);

schemaChangeStatus = RequestStatus.APPLYING;
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
Expand All @@ -276,9 +316,10 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {

public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
Preconditions.checkState(
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
if (schemaChangeStatus == RequestStatus.FINISHED) {
schemaChangeStatus = RequestStatus.IDLE;
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
currentDerivedSchemaChangeEvents);
Expand Down
Loading