Skip to content

Commit 5ec21a4

Browse files
committed
[FLINK-36406] Close MetadataApplier when the job stops
1 parent 4b13c49 commit 5ec21a4

File tree

3 files changed

+16
-1
lines changed

3 files changed

+16
-1
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
/** {@code MetadataApplier} is used to apply metadata changes to external systems. */
3232
@PublicEvolving
33-
public interface MetadataApplier extends Serializable {
33+
public interface MetadataApplier extends Serializable, AutoCloseable {
3434

3535
/** Apply the given {@link SchemaChangeEvent} to external systems. */
3636
void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;
@@ -50,4 +50,7 @@ default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEve
5050
default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
5151
return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());
5252
}
53+
54+
@Override
55+
default void close() throws Exception {}
5356
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
148148
});
149149
}
150150

151+
public void close() throws Exception {
152+
if (catalog != null) {
153+
catalog.close();
154+
}
155+
}
156+
151157
private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException {
152158
try {
153159
if (!catalog.databaseExists(event.tableId().getSchemaName())) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,12 @@ public void close() throws IOException {
348348
if (schemaChangeThreadPool != null) {
349349
schemaChangeThreadPool.shutdown();
350350
}
351+
352+
try {
353+
metadataApplier.close();
354+
} catch (Exception e) {
355+
throw new IOException("Failed to close metadata applier.", e);
356+
}
351357
}
352358

353359
private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) {

0 commit comments

Comments
 (0)