Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[debezium] Bump debezium version to 1.9.7.Final #2156

Merged
merged 2 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package com.ververica.cdc.connectors.base.dialect;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.util.FlinkRuntimeException;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
Expand Down Expand Up @@ -51,18 +49,7 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi
* @param sourceConfig a basic source configuration.
* @return a utility that simplifies using a JDBC connection.
*/
default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
sourceConfig.getDbzConfiguration(),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()));
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}
JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig);

/** Get a connection pool factory to create connection pool. */
JdbcConnectionPoolFactory getPooledDataSourceFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.debezium.document.DocumentWriter;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
Expand Down Expand Up @@ -57,7 +59,7 @@
* this is useful for downstream to deserialize the {@link HistoryRecord} back.
* </pre>
*/
public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);

public static final String HISTORY_RECORD_FIELD = "historyRecord";
Expand Down Expand Up @@ -132,7 +134,9 @@ public ChangeEventQueue<DataChangeEvent> getQueue() {

@Override
public void dispatchSchemaChangeEvent(
TableId dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter)
P partition,
TableId dataCollectionId,
SchemaChangeEventEmitter schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
Expand All @@ -141,6 +145,11 @@ public void dispatchSchemaChangeEvent(
}
}
schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
IncrementalSnapshotChangeEventSource<P, TableId> incrementalEventSource =
getIncrementalSnapshotChangeEventSource();
if (incrementalEventSource != null) {
incrementalEventSource.processSchemaChange(partition, dataCollectionId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public interface FetchTask<Split> {
/** Returns the split that the task used. */
Split getSplit();

/** Stops current task, most of the implementations don't need this. */
default void stop() {}
/** Close current task. * */
void close();

/** Base context used in the execution of fetch task. */
interface Context {
Expand All @@ -66,5 +66,7 @@ interface Context {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);

List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);

void close() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}

if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.close();
}

if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,15 @@ private void configureFilter() {
this.pureStreamPhaseTables.clear();
}

public void stopReadTask() {
public void stopReadTask() throws Exception {
this.currentTaskRunning = false;

if (taskContext != null) {
taskContext.close();
}

if (streamFetchTask != null) {
streamFetchTask.stop();
streamFetchTask.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -168,4 +169,6 @@ public SchemaNameAdjuster getSchemaNameAdjuster() {
public abstract JdbcSourceEventDispatcher getDispatcher();

public abstract OffsetContext getOffsetContext();

public abstract Partition getPartition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlStreamFetchTask;
import com.ververica.cdc.connectors.base.experimental.utils.MySqlSchema;
import com.ververica.cdc.connectors.base.experimental.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand All @@ -54,6 +56,8 @@
@Experimental
public class MySqlDialect implements JdbcDataSourceDialect {

private static final String QUOTED_CHARACTER = "`";

private static final long serialVersionUID = 1L;
private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
Expand All @@ -69,6 +73,21 @@ public String getName() {
return "MySQL";
}

public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
QUOTED_CHARACTER,
QUOTED_CHARACTER);
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}

@Override
public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
Expand Down
Loading