Skip to content

Commit 4123b16

Browse files
shanthooshjagadish-v0
authored andcommitted
SAMZA-1568: Handle ZkInterruptedException in zkclient.close.
When zookeeper session failures occur in a stream processor, leaves the group(zkClient is closed) and joins the group again. The last step in that shutdown sequence is zkClient.close(). In some scenarios, it throws the following exception, org.I0Itec.zkclient.exception.ZkInterruptedException: java.lang.InterruptedException at org.I0Itec.zkclient.ZkClient.close(ZkClient.java:1278) at org.apache.samza.zk.ZkControllerImpl.stop(ZkControllerImpl.java:92) at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:141) In existing implementation this is not handled, there by killing the stream processor. The following codepath triggers this exception: `StreamProcessor.stop -> ZkJobCoordinator.stop() -> zkController.stop() -> zkUtils.close` This exception causes the integration test to fail occasionally and can cause LocalApplicationRunner.waitForFinish method call to block indefinitely(since this callback event success, updates the latch state required for waitForFinish to end). Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com> Reviewers: Jagadish <jagadish@apache.org> Closes apache#416 from shanthoosh/zk_utils_close
1 parent 002e131 commit 4123b16

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void stop() {
8989

9090
// close zk connection
9191
if (zkUtils != null) {
92-
zkUtils.getZkClient().close();
92+
zkUtils.close();
9393
}
9494
}
9595

samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ public int getGeneration() {
9494
return currentGeneration.get();
9595
}
9696

97-
98-
9997
public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) {
10098
this.keyBuilder = zkKeyBuilder;
10199
this.connectionTimeoutMs = connectionTimeoutMs;
@@ -298,7 +296,13 @@ public boolean exists(String path) {
298296
}
299297

300298
public void close() throws ZkInterruptedException {
301-
zkClient.close();
299+
try {
300+
zkClient.close();
301+
} catch (ZkInterruptedException e) {
302+
// Swallowing due to occurrence in the last stage of lifecycle (Not actionable) and clear the interrupted status.
303+
Thread.interrupted();
304+
LOG.warn("Ignoring the exception when closing the zookeeper client.", e);
305+
}
302306
}
303307

304308
/**

samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.I0Itec.zkclient.IZkDataListener;
3131
import org.I0Itec.zkclient.ZkClient;
3232
import org.I0Itec.zkclient.ZkConnection;
33+
import org.I0Itec.zkclient.exception.ZkInterruptedException;
3334
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
3435
import org.apache.commons.lang3.reflect.FieldUtils;
3536
import org.apache.samza.SamzaException;
@@ -46,6 +47,7 @@
4647
import org.junit.Rule;
4748
import org.junit.Test;
4849
import org.junit.rules.ExpectedException;
50+
import org.mockito.Mockito;
4951

5052

5153
public class TestZkUtils {
@@ -88,8 +90,9 @@ public void testSetup() {
8890

8991
@After
9092
public void testTeardown() {
91-
zkUtils.close();
92-
zkClient.close();
93+
if (zkClient != null) {
94+
zkUtils.close();
95+
}
9396
}
9497

9598
private ZkUtils getZkUtils() {
@@ -392,6 +395,15 @@ public int compare(String o1, String o2) {
392395

393396
}
394397

398+
@Test
399+
public void testCloseShouldNotThrowZkInterruptedExceptionToCaller() {
400+
ZkClient zkClient = Mockito.mock(ZkClient.class);
401+
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
402+
SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
403+
Mockito.doThrow(new ZkInterruptedException(new InterruptedException())).when(zkClient).close();
404+
zkUtils.close();
405+
}
406+
395407
public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
396408
long delay = startDelayMs;
397409
while (delay < maxDelayMs) {

0 commit comments

Comments
 (0)