Skip to content

Commit 3bc646f

Browse files
committed
[hotfix] Run schema coordinator logic asynchronously to avoid blocking the main thread
1 parent 963c7ff commit 3bc646f

File tree

7 files changed

+314
-104
lines changed

7 files changed

+314
-104
lines changed

flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.SortedMap;
4141
import java.util.TreeMap;
4242
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.Executors;
4344
import java.util.stream.Collectors;
4445

4546
/** Dummy classes for migration test. Called via reflection. */
@@ -69,6 +70,7 @@ public SchemaRegistry generateSchemaRegistry() {
6970
return new SchemaRegistry(
7071
"Dummy Name",
7172
null,
73+
Executors.newFixedThreadPool(1),
7274
new MetadataApplier() {
7375
@Override
7476
public boolean acceptsSchemaEvolutionType(

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java

Lines changed: 155 additions & 77 deletions
Large diffs are not rendered by default.

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
import org.apache.flink.cdc.common.sink.MetadataApplier;
2424
import org.apache.flink.runtime.jobgraph.OperatorID;
2525
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
26+
import org.apache.flink.util.FatalExitExceptionHandler;
2627

2728
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ThreadFactory;
2832

2933
/** Provider of {@link SchemaRegistry}. */
3034
@Internal
@@ -57,7 +61,55 @@ public OperatorID getOperatorId() {
5761

5862
@Override
5963
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
64+
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
65+
new CoordinatorExecutorThreadFactory(
66+
"schema-evolution-coordinator", context.getUserCodeClassloader());
67+
ExecutorService coordinatorExecutor =
68+
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
6069
return new SchemaRegistry(
61-
operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
70+
operatorName,
71+
context,
72+
coordinatorExecutor,
73+
metadataApplier,
74+
routingRules,
75+
schemaChangeBehavior);
76+
}
77+
78+
/** A thread factory class that provides some helper methods. */
79+
public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
80+
81+
private final String coordinatorThreadName;
82+
private final ClassLoader cl;
83+
private final Thread.UncaughtExceptionHandler errorHandler;
84+
85+
private Thread t;
86+
87+
CoordinatorExecutorThreadFactory(
88+
final String coordinatorThreadName, final ClassLoader contextClassLoader) {
89+
this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
90+
}
91+
92+
CoordinatorExecutorThreadFactory(
93+
final String coordinatorThreadName,
94+
final ClassLoader contextClassLoader,
95+
final Thread.UncaughtExceptionHandler errorHandler) {
96+
this.coordinatorThreadName = coordinatorThreadName;
97+
this.cl = contextClassLoader;
98+
this.errorHandler = errorHandler;
99+
}
100+
101+
@Override
102+
public synchronized Thread newThread(Runnable r) {
103+
if (t != null) {
104+
throw new Error(
105+
"This indicates that a fatal error has happened and caused the "
106+
+ "coordinator executor thread to exit. Check the earlier logs"
107+
+ "to see the root cause of the problem.");
108+
}
109+
t = new Thread(r, coordinatorThreadName);
110+
t.setContextClassLoader(cl);
111+
t.setUncaughtExceptionHandler(errorHandler);
112+
return t;
113+
}
62114
}
63115
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
3939
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
4040
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
41+
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
4142

4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
@@ -94,15 +95,19 @@ public class SchemaRegistryRequestHandler implements Closeable {
9495

9596
private final SchemaChangeBehavior schemaChangeBehavior;
9697

98+
private final OperatorCoordinator.Context context;
99+
97100
public SchemaRegistryRequestHandler(
98101
MetadataApplier metadataApplier,
99102
SchemaManager schemaManager,
100103
SchemaDerivation schemaDerivation,
101-
SchemaChangeBehavior schemaChangeBehavior) {
104+
SchemaChangeBehavior schemaChangeBehavior,
105+
OperatorCoordinator.Context context) {
102106
this.metadataApplier = metadataApplier;
103107
this.schemaManager = schemaManager;
104108
this.schemaDerivation = schemaDerivation;
105109
this.schemaChangeBehavior = schemaChangeBehavior;
110+
this.context = context;
106111

107112
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
108113
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
@@ -122,8 +127,8 @@ public SchemaRegistryRequestHandler(
122127
*
123128
* @param request the received SchemaChangeRequest
124129
*/
125-
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
126-
SchemaChangeRequest request) {
130+
public void handleSchemaChangeRequest(
131+
SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {
127132

128133
// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
129134
// change requests simultaneously from each subTask
@@ -156,7 +161,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
156161
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
157162
pendingSubTaskIds.add(requestSubTaskId);
158163
}
159-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
164+
response.complete(wrap(SchemaChangeResponse.busy()));
165+
return;
160166
}
161167

162168
SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -168,8 +174,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
168174
LOG.info(
169175
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
170176
request);
171-
return CompletableFuture.completedFuture(
172-
wrap(SchemaChangeResponse.duplicate()));
177+
response.complete(wrap(SchemaChangeResponse.duplicate()));
178+
return;
173179
}
174180
schemaManager.applyOriginalSchemaChange(event);
175181
List<SchemaChangeEvent> derivedSchemaChangeEvents =
@@ -183,16 +189,18 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
183189
LOG.info(
184190
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
185191
request);
186-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
192+
193+
response.complete(wrap(SchemaChangeResponse.ignored()));
194+
return;
187195
}
188196

189197
LOG.info(
190198
"SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
191199
// This request has been accepted.
192200
schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
193201
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
194-
return CompletableFuture.completedFuture(
195-
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
202+
203+
response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
196204
} else {
197205
LOG.info(
198206
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
@@ -202,7 +210,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
202210
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
203211
pendingSubTaskIds.add(requestSubTaskId);
204212
}
205-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
213+
response.complete(wrap(SchemaChangeResponse.busy()));
206214
}
207215
}
208216
}
@@ -299,7 +307,7 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
299307
}
300308
}
301309

302-
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
310+
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {
303311
Preconditions.checkState(
304312
schemaChangeStatus != RequestStatus.IDLE,
305313
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
@@ -311,11 +319,12 @@ public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
311319

312320
// This request has been finished, return it and prepare for the next request
313321
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
314-
return CompletableFuture.supplyAsync(
315-
() -> wrap(new SchemaChangeResultResponse(finishedEvents)));
322+
SchemaChangeResultResponse resultResponse =
323+
new SchemaChangeResultResponse(finishedEvents);
324+
response.complete(wrap(resultResponse));
316325
} else {
317326
// Still working on schema change request, waiting it
318-
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
327+
response.complete(wrap(new SchemaChangeProcessingResponse()));
319328
}
320329
}
321330

@@ -444,7 +453,8 @@ private boolean shouldIgnoreException(Throwable throwable) {
444453

445454
private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
446455
if (currentChangeException != null) {
447-
throw new RuntimeException("Failed to apply schema change.", currentChangeException);
456+
context.failJob(
457+
new RuntimeException("Failed to apply schema change.", currentChangeException));
448458
}
449459
List<SchemaChangeEvent> finishedSchemaChanges =
450460
new ArrayList<>(currentFinishedSchemaChanges);

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
3131
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
3232
import org.apache.flink.cdc.common.event.TableId;
33+
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
3334
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
3435
import org.apache.flink.cdc.common.schema.Column;
3536
import org.apache.flink.cdc.common.schema.Schema;
@@ -1035,11 +1036,16 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
10351036
new AddColumnEvent.ColumnWithPosition(
10361037
Column.physicalColumn(
10371038
"height", DOUBLE, "Height data")))));
1038-
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
1039+
processEvent(schemaOperator, addColumnEvents);
1040+
Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
1041+
Assertions.assertThat(harness.getJobFailureCause())
10391042
.cause()
1040-
.cause()
1041-
.isExactlyInstanceOf(RuntimeException.class)
1042-
.hasMessageContaining("Failed to apply schema change");
1043+
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
1044+
.matches(
1045+
e ->
1046+
((UnsupportedSchemaChangeEventException) e)
1047+
.getExceptionMessage()
1048+
.equals("Sink doesn't support such schema change event."));
10431049
harness.close();
10441050
}
10451051

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.configuration.Configuration;
3939
import org.apache.flink.runtime.jobgraph.OperatorID;
4040
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
41-
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
4241
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
4342
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
4443
import org.apache.flink.streaming.api.operators.Output;
@@ -56,6 +55,7 @@
5655
import java.util.ArrayList;
5756
import java.util.LinkedList;
5857
import java.util.Set;
58+
import java.util.concurrent.Executors;
5959

6060
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
6161

@@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
8181
private final SchemaRegistry schemaRegistry;
8282
private final TestingSchemaRegistryGateway schemaRegistryGateway;
8383
private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
84+
private final MockedOperatorCoordinatorContext mockedContext;
8485

8586
public EventOperatorTestHarness(OP operator, int numOutputs) {
8687
this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
@@ -94,11 +95,14 @@ public EventOperatorTestHarness(
9495
OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) {
9596
this.operator = operator;
9697
this.numOutputs = numOutputs;
98+
this.mockedContext =
99+
new MockedOperatorCoordinatorContext(
100+
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
97101
schemaRegistry =
98102
new SchemaRegistry(
99103
"SchemaOperator",
100-
new MockOperatorCoordinatorContext(
101-
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
104+
mockedContext,
105+
Executors.newFixedThreadPool(1),
102106
new CollectingMetadataApplier(duration),
103107
new ArrayList<>(),
104108
behavior);
@@ -113,11 +117,14 @@ public EventOperatorTestHarness(
113117
Set<SchemaChangeEventType> enabledEventTypes) {
114118
this.operator = operator;
115119
this.numOutputs = numOutputs;
120+
this.mockedContext =
121+
new MockedOperatorCoordinatorContext(
122+
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
116123
schemaRegistry =
117124
new SchemaRegistry(
118125
"SchemaOperator",
119-
new MockOperatorCoordinatorContext(
120-
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
126+
mockedContext,
127+
Executors.newFixedThreadPool(1),
121128
new CollectingMetadataApplier(duration, enabledEventTypes),
122129
new ArrayList<>(),
123130
behavior);
@@ -133,11 +140,14 @@ public EventOperatorTestHarness(
133140
Set<SchemaChangeEventType> errorsOnEventTypes) {
134141
this.operator = operator;
135142
this.numOutputs = numOutputs;
143+
this.mockedContext =
144+
new MockedOperatorCoordinatorContext(
145+
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
136146
schemaRegistry =
137147
new SchemaRegistry(
138148
"SchemaOperator",
139-
new MockOperatorCoordinatorContext(
140-
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
149+
mockedContext,
150+
Executors.newFixedThreadPool(1),
141151
new CollectingMetadataApplier(
142152
duration, enabledEventTypes, errorsOnEventTypes),
143153
new ArrayList<>(),
@@ -196,6 +206,14 @@ public Schema getLatestEvolvedSchema(TableId tableId) throws Exception {
196206
.orElse(null);
197207
}
198208

209+
public boolean isJobFailed() {
210+
return mockedContext.isJobFailed();
211+
}
212+
213+
public Throwable getJobFailureCause() {
214+
return mockedContext.getFailureCause();
215+
}
216+
199217
@Override
200218
public void close() throws Exception {
201219
operator.close();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.runtime.testutils.operators;
19+
20+
import org.apache.flink.runtime.jobgraph.OperatorID;
21+
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
22+
23+
/**
24+
* This is a mocked version of Operator coordinator context that stores failure cause for testing
25+
* purposes only.
26+
*/
27+
public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext {
28+
public MockedOperatorCoordinatorContext(
29+
OperatorID operatorID, ClassLoader userCodeClassLoader) {
30+
super(operatorID, userCodeClassLoader);
31+
}
32+
33+
private Throwable failureCause;
34+
35+
@Override
36+
public void failJob(Throwable cause) {
37+
super.failJob(cause);
38+
failureCause = cause;
39+
}
40+
41+
public Throwable getFailureCause() {
42+
return failureCause;
43+
}
44+
}

0 commit comments

Comments
 (0)