Skip to content

Commit

Permalink
[debezium] bump debezium version to 1.9.7.
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed May 30, 2023
1 parent 39823d4 commit 8684c7f
Show file tree
Hide file tree
Showing 58 changed files with 2,300 additions and 4,071 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package com.ververica.cdc.connectors.base.dialect;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.util.FlinkRuntimeException;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
Expand Down Expand Up @@ -51,18 +49,7 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi
* @param sourceConfig a basic source configuration.
* @return a utility that simplifies using a JDBC connection.
*/
default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
sourceConfig.getDbzConfiguration(),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()));
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}
JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig);

/** Get a connection pool factory to create connection pool. */
JdbcConnectionPoolFactory getPooledDataSourceFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.debezium.document.DocumentWriter;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
Expand Down Expand Up @@ -57,7 +59,7 @@
* this is useful for downstream to deserialize the {@link HistoryRecord} back.
* </pre>
*/
public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);

public static final String HISTORY_RECORD_FIELD = "historyRecord";
Expand Down Expand Up @@ -132,7 +134,9 @@ public ChangeEventQueue<DataChangeEvent> getQueue() {

@Override
public void dispatchSchemaChangeEvent(
TableId dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter)
P partition,
TableId dataCollectionId,
SchemaChangeEventEmitter schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
Expand All @@ -141,6 +145,11 @@ public void dispatchSchemaChangeEvent(
}
}
schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
IncrementalSnapshotChangeEventSource<P, TableId> incrementalEventSource =
getIncrementalSnapshotChangeEventSource();
if (incrementalEventSource != null) {
incrementalEventSource.processSchemaChange(partition, dataCollectionId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -169,4 +170,6 @@ public SchemaNameAdjuster getSchemaNameAdjuster() {
public abstract JdbcSourceEventDispatcher getDispatcher();

public abstract OffsetContext getOffsetContext();

public abstract Partition getPartition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}

@Override
public void recover(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
offsets.forEach((source, position) -> recover(source, position, schema, ddlParser));
}

@Override
public void stop() {
listener.stopped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlStreamFetchTask;
import com.ververica.cdc.connectors.base.experimental.utils.MySqlSchema;
import com.ververica.cdc.connectors.base.experimental.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand All @@ -54,6 +56,8 @@
@Experimental
public class MySqlDialect implements JdbcDataSourceDialect {

private static final String QUOTED_CHARACTER = "`";

private static final long serialVersionUID = 1L;
private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
Expand All @@ -69,6 +73,21 @@ public String getName() {
return "MySQL";
}

public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
JdbcConnection jdbc =
new JdbcConnection(
JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()),
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
QUOTED_CHARACTER,
QUOTED_CHARACTER);
try {
jdbc.connect();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return jdbc;
}

@Override
public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
Expand Down
Loading

0 comments on commit 8684c7f

Please sign in to comment.