Skip to content

Commit

Permalink
DBZ-7570/debezium#133: started to add workaround using SO_LINGER with…
Browse files Browse the repository at this point in the history
… 0 timeout if use.nongraceful.disconnect=true
  • Loading branch information
acristu committed Apr 18, 2024
1 parent 440b728 commit f3fa506
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ public interface SnapshotLockingStrategy {
.withDescription("Interval for connection checking if keep alive thread is used, given in milliseconds "
+ "Defaults to 1 minute (60,000 ms).");

public static final Field USE_NONGRACEFUL_DISCONNECT = Field.create("use.nongraceful.disconnect")
.withDisplayName("Use Non-graceful Disconnect")
.withType(ConfigDef.Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 4))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(false)
.withDescription("Whether to use `socket.setSoLinger(true, 0)` when BinaryLogClient"
+ " keepalive thread triggers a disconnect for a stale connection.");

public static final Field ROW_COUNT_FOR_STREAMING_RESULT_SETS = Field.create("min.row.count.to.stream.results")
.withDisplayName("Stream result set of size")
.withType(ConfigDef.Type.INT)
Expand Down Expand Up @@ -676,6 +686,7 @@ public interface SnapshotLockingStrategy {
CONNECTION_TIMEOUT_MS,
KEEP_ALIVE,
KEEP_ALIVE_INTERVAL_MS,
USE_NONGRACEFUL_DISCONNECT,
SNAPSHOT_MODE,
SNAPSHOT_QUERY_MODE,
SNAPSHOT_QUERY_MODE_CUSTOM_NAME,
Expand Down Expand Up @@ -961,4 +972,8 @@ private static int validateGtidSetExcludes(Configuration config, Field field, Va
}
return 0;
}

public boolean usesNonGracefulDisconnect() {
return config.getBoolean(USE_NONGRACEFUL_DISCONNECT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ protected BinaryLogClient createBinaryLogClient(BinlogTaskContext<?> taskContext
client.setSslSocketFactory(sslSocketFactory);
}
}
client.setUseNonGracefulDisconnect(connectorConfig.usesNonGracefulDisconnect());

configureReplicaCompatibility(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,53 @@ public void shouldConsumeEventsWithNoSnapshot() throws SQLException, Interrupted
});
}

@Test
@FixFor("DBZ-7570 - workaround")
public void shouldConsumeEventsWithNonGracefulDisconnect() throws SQLException, InterruptedException {
Files.delete(SCHEMA_HISTORY_PATH);

// Use the DB configuration to define the connector's configuration ...
config = RO_DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(BinlogConnectorConfig.USE_NONGRACEFUL_DISCONNECT, true)
.build();

// Start the connector ...
start(getConnectorClass(), config);

// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("Products")).size()).isEqualTo(9);
assertThat(records.topics().size()).isEqualTo(4 + 1);
assertThat(records.ddlRecordsForDatabase(RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);

// check float value
Optional<SourceRecord> recordWithScientfic = records.recordsForTopic(RO_DATABASE.topicForTable("Products")).stream()
.filter(x -> "hammer2".equals(getAfter(x).get("name"))).findFirst();
assertThat(recordWithScientfic.isPresent());
assertThat(getAfter(recordWithScientfic.get()).get("weight")).isEqualTo(0.875f);

// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);

// More records may have been written (if this method were run after the others), but we don't care ...
stopConnector();

records.recordsForTopic(RO_DATABASE.topicForTable("orders")).forEach(record -> {
print(record);
});

records.recordsForTopic(RO_DATABASE.topicForTable("customers")).forEach(record -> {
print(record);
});
}

@Test
@FixFor("DBZ-1962")
public void shouldConsumeEventsWithIncludedColumns() throws SQLException, InterruptedException {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<!-- Database drivers, should align with databases -->
<version.postgresql.driver>42.6.1</version.postgresql.driver>
<version.mysql.driver>8.3.0</version.mysql.driver>
<version.mysql.binlog>0.29.1</version.mysql.binlog>
<version.mysql.binlog>0.29.2</version.mysql.binlog>
<version.mongo.driver>4.11.0</version.mongo.driver>
<version.sqlserver.driver>12.4.2.jre8</version.sqlserver.driver>
<version.db2.driver>11.5.0.0</version.db2.driver>
Expand Down

0 comments on commit f3fa506

Please sign in to comment.