5555import java .time .Duration ;
5656import java .util .ArrayList ;
5757import java .util .Collection ;
58+ import java .util .HashMap ;
5859import java .util .List ;
5960import java .util .Map ;
6061import java .util .Set ;
6162import java .util .concurrent .CompletableFuture ;
6263import java .util .concurrent .ConcurrentHashMap ;
6364import java .util .concurrent .ExecutorService ;
65+ import java .util .concurrent .Executors ;
6466import java .util .concurrent .TimeoutException ;
6567import java .util .concurrent .atomic .AtomicReference ;
6668import java .util .stream .Collectors ;
@@ -98,6 +100,9 @@ public class SchemaCoordinator extends SchemaRegistry {
98100 private transient Multimap <Tuple2 <Integer , SchemaChangeEvent >, Integer >
99101 alreadyHandledSchemaChangeEvents ;
100102
103+ /** Executor service to execute schema change. */
104+ private final ExecutorService schemaChangeThreadPool ;
105+
101106 public SchemaCoordinator (
102107 String operatorName ,
103108 OperatorCoordinator .Context context ,
@@ -114,6 +119,7 @@ public SchemaCoordinator(
114119 routingRules ,
115120 schemaChangeBehavior ,
116121 rpcTimeout );
122+ this .schemaChangeThreadPool = Executors .newSingleThreadExecutor ();
117123 }
118124
119125 // -----------------
@@ -131,6 +137,14 @@ public void start() throws Exception {
131137 "Started SchemaRegistry for {}. Parallelism: {}" , operatorName , currentParallelism );
132138 }
133139
140+ @ Override
141+ public void close () throws Exception {
142+ super .close ();
143+ if (schemaChangeThreadPool != null && !schemaChangeThreadPool .isShutdown ()) {
144+ schemaChangeThreadPool .shutdownNow ();
145+ }
146+ }
147+
134148 // --------------------------
135149 // Checkpoint related methods
136150 // --------------------------
@@ -268,7 +282,20 @@ private void handleSchemaEvolveRequest(
268282 LOG .info (
269283 "Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING." ,
270284 request );
271- startSchemaChange ();
285+
286+ schemaChangeThreadPool .submit (
287+ () -> {
288+ try {
289+ startSchemaChange ();
290+ } catch (Throwable t ) {
291+ failJob (
292+ "Schema change applying task" ,
293+ new FlinkRuntimeException (
294+ "Failed to apply schema change event." , t ));
295+ throw new FlinkRuntimeException (
296+ "Failed to apply schema change event." , t );
297+ }
298+ });
272299 }
273300 }
274301
@@ -301,34 +328,56 @@ private void startSchemaChange() throws TimeoutException {
301328 LOG .info ("All flushed. Going to evolve schema for pending requests: {}" , pendingRequests );
302329 flushedSinkWriters .clear ();
303330
304- // Deduce what schema change events should be applied to sink table
305- List <SchemaChangeEvent > deducedSchemaChangeEvents = deduceEvolvedSchemaChanges ();
331+ // Deduce what schema change events should be applied to sink table, and affected sink
332+ // tables' schema
333+ Tuple2 <Set <TableId >, List <SchemaChangeEvent >> deduceSummary = deduceEvolvedSchemaChanges ();
306334
307335 // And tries to apply it to external system
308336 List <SchemaChangeEvent > successfullyAppliedSchemaChangeEvents = new ArrayList <>();
309- for (SchemaChangeEvent appliedSchemaChangeEvent : deducedSchemaChangeEvents ) {
337+ for (SchemaChangeEvent appliedSchemaChangeEvent : deduceSummary . f1 ) {
310338 if (applyAndUpdateEvolvedSchemaChange (appliedSchemaChangeEvent )) {
311339 successfullyAppliedSchemaChangeEvents .add (appliedSchemaChangeEvent );
312340 }
313341 }
314342
315- // Then, we broadcast affected schema changes to mapper and release upstream
316- pendingRequests .forEach (
317- (subTaskId , tuple ) -> {
318- LOG .info ("Coordinator finishes pending future from {}" , subTaskId );
319- tuple .f1 .complete (
320- wrap (new SchemaChangeResponse (successfullyAppliedSchemaChangeEvents )));
321- });
343+ // Fetch refreshed view for affected tables. We can't rely on operator clients to do this
344+ // because it might not have a complete schema view after restoring from previous states.
345+ Set <TableId > affectedTableIds = deduceSummary .f0 ;
346+ Map <TableId , Schema > evolvedSchemaView = new HashMap <>();
347+ for (TableId tableId : affectedTableIds ) {
348+ schemaManager
349+ .getLatestEvolvedSchema (tableId )
350+ .ifPresent (schema -> evolvedSchemaView .put (tableId , schema ));
351+ }
352+
353+ List <Tuple2 <SchemaChangeRequest , CompletableFuture <CoordinationResponse >>> futures =
354+ new ArrayList <>(pendingRequests .values ());
322355
356+ // Restore coordinator internal states first...
323357 pendingRequests .clear ();
324358
325359 LOG .info ("Finished schema evolving. Switching from EVOLVING to IDLE." );
326360 Preconditions .checkState (
327361 evolvingStatus .compareAndSet (RequestStatus .EVOLVING , RequestStatus .IDLE ),
328362 "RequestStatus should be EVOLVING when schema evolving finishes." );
363+
364+ // ... and broadcast affected schema changes to mapper and release upstream then.
365+ // Make sure we've cleaned-up internal state before this, or we may receive new requests in
366+ // a dirty state.
367+ futures .forEach (
368+ tuple -> {
369+ LOG .info (
370+ "Coordinator finishes pending future from {}" ,
371+ tuple .f0 .getSinkSubTaskId ());
372+ tuple .f1 .complete (
373+ wrap (
374+ new SchemaChangeResponse (
375+ evolvedSchemaView ,
376+ successfullyAppliedSchemaChangeEvents )));
377+ });
329378 }
330379
331- private List <SchemaChangeEvent > deduceEvolvedSchemaChanges () {
380+ private Tuple2 < Set < TableId >, List <SchemaChangeEvent > > deduceEvolvedSchemaChanges () {
332381 List <SchemaChangeRequest > validSchemaChangeRequests =
333382 pendingRequests .values ().stream ()
334383 .map (e -> e .f0 )
@@ -408,7 +457,7 @@ private List<SchemaChangeEvent> deduceEvolvedSchemaChanges() {
408457 evolvedSchemaChanges .addAll (normalizedEvents );
409458 }
410459
411- return evolvedSchemaChanges ;
460+ return Tuple2 . of ( affectedSinkTableIds , evolvedSchemaChanges ) ;
412461 }
413462
414463 private boolean applyAndUpdateEvolvedSchemaChange (SchemaChangeEvent schemaChangeEvent ) {
0 commit comments