Skip to content

Commit

Permalink
[debezium] bump debezium version to 1.9.7.
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Jun 8, 2023
1 parent 087f9b1 commit 892af3f
Show file tree
Hide file tree
Showing 75 changed files with 2,873 additions and 2,935 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package com.ververica.cdc.connectors.base.dialect;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.util.FlinkRuntimeException;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
Expand Down Expand Up @@ -51,18 +49,7 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi
* @param sourceConfig a basic source configuration.
* @return a utility that simplifies using a JDBC connection.
*/
default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
sourceConfig.getDbzConfiguration(),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()));
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}
JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig);

/** Get a connection pool factory to create connection pool. */
JdbcConnectionPoolFactory getPooledDataSourceFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.debezium.document.DocumentWriter;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
Expand Down Expand Up @@ -57,7 +59,7 @@
* this is useful for downstream to deserialize the {@link HistoryRecord} back.
* </pre>
*/
public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);

public static final String HISTORY_RECORD_FIELD = "historyRecord";
Expand Down Expand Up @@ -132,7 +134,9 @@ public ChangeEventQueue<DataChangeEvent> getQueue() {

@Override
public void dispatchSchemaChangeEvent(
TableId dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter)
P partition,
TableId dataCollectionId,
SchemaChangeEventEmitter schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
Expand All @@ -141,6 +145,11 @@ public void dispatchSchemaChangeEvent(
}
}
schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
IncrementalSnapshotChangeEventSource<P, TableId> incrementalEventSource =
getIncrementalSnapshotChangeEventSource();
if (incrementalEventSource != null) {
incrementalEventSource.processSchemaChange(partition, dataCollectionId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public interface FetchTask<Split> {
/** Returns the split that the task used. */
Split getSplit();

/** Close current task. * */
void close();

/** Base context used in the execution of fetch task. */
interface Context {
void configure(SourceSplitBase sourceSplitBase);
Expand All @@ -63,5 +66,7 @@ interface Context {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);

List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);

void close() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}

if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.close();
}

if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}

if (streamFetchTask != null) {
streamFetchTask.close();
}

if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -169,4 +170,6 @@ public SchemaNameAdjuster getSchemaNameAdjuster() {
public abstract JdbcSourceEventDispatcher getDispatcher();

public abstract OffsetContext getOffsetContext();

public abstract Partition getPartition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlStreamFetchTask;
import com.ververica.cdc.connectors.base.experimental.utils.MySqlSchema;
import com.ververica.cdc.connectors.base.experimental.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand All @@ -54,6 +56,8 @@
@Experimental
public class MySqlDialect implements JdbcDataSourceDialect {

private static final String QUOTED_CHARACTER = "`";

private static final long serialVersionUID = 1L;
private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
Expand All @@ -69,6 +73,21 @@ public String getName() {
return "MySQL";
}

public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
QUOTED_CHARACTER,
QUOTED_CHARACTER);
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}

@Override
public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
Expand Down
Loading

0 comments on commit 892af3f

Please sign in to comment.