Skip to content

Commit

Permalink
DBZ-8414 Catch exception from stopping the connector
Browse files Browse the repository at this point in the history
Otherwise engine shutdown may get stuck as in such case the count down
of `shutDownLatch` would be skipped and engine would wait for
`shutDownLatch` forever.
  • Loading branch information
vjuranek authored and jpechane committed Nov 20, 2024
1 parent 378ef7c commit 655ccef
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ public void runWithTask(final Consumer<SourceTask> consumer) {
* @param stateBeforeStop {@link State} of the engine when the shutdown was requested.
*/
private void close(final State stateBeforeStop) {
stopConnector(tasks, stateBeforeStop);
try {
stopConnector(tasks, stateBeforeStop);
}
catch (Exception e) {
LOGGER.warn("Failed to stop connector properly: ", e);
}
if (headerConverter != null) {
try {
headerConverter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,66 @@ public void testCompletionCallbackCalledUponFailure() throws Exception {
assertThat(callbackLatch.getCount()).isEqualTo(0);
}

@Test
@FixFor("DBZ-8414")
public void testErrorInConnectorCallbackDoesNotBlockShutdown() throws Exception {
final Properties props = new Properties();
props.put(ConnectorConfig.NAME_CONFIG, "debezium-engine");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName());
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0");
props.put(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString());
props.put(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic");

appendLinesToSource(NUMBER_OF_LINES);
AtomicInteger recordsSent = new AtomicInteger();
CountDownLatch recordsLatch = new CountDownLatch(1);

DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback() {
@Override
public void connectorStopped() {
super.connectorStopped();
throw new RuntimeException("User connector callback exception, enjoy");
}
})
.notifying((records, committer) -> {
for (SourceRecord r : records) {
committer.markProcessed(r);
recordsSent.getAndIncrement();
}
committer.markBatchFinished();
recordsLatch.countDown();
}).build();

// Start engine and make sure it's running.
engineExecSrv.submit(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(recordsSent.get()).isEqualTo(NUMBER_OF_LINES);

// Stop engine in another thread to avoid blocking the main thread if it gets stuck.
CountDownLatch shutdownLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().submit(() -> {
try {
engine.close();
shutdownLatch.countDown();
}
catch (IOException e) {
// pass
}
});

// Assert that latch was counted down, i.e. closing the engine hasn't blocked forever.
shutdownLatch.await(1, TimeUnit.SECONDS);
assertThat(shutdownLatch.getCount()).isEqualTo(0);
}

@Test
@FixFor("DBZ-2534")
public void testCannotStopWhileTasksAreStarting() throws Exception {
Expand Down

0 comments on commit 655ccef

Please sign in to comment.