Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Deadlock Bug Between ZK Callback & Event Thread On Acquiring Coordinator Object #964

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.connectors.kafka;

import java.time.Duration;
import java.util.Properties;

import com.linkedin.datastream.common.zk.ZkClient;
Expand Down Expand Up @@ -41,6 +42,9 @@ public static Coordinator createCoordinator(String zkAddr, String cluster, Prope
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
CachedDatastreamReader cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand All @@ -178,6 +181,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down Expand Up @@ -2986,6 +2992,7 @@ public void testCoordinatorLeaderCleanupTasksPostElection() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3098,6 +3105,7 @@ public void testNewlyElectedLeaderRevokesAssignmentTokens() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(1000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3185,6 +3193,7 @@ void testOnSessionExpired(boolean handleNewSession) throws DatastreamException,
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_REINIT_ON_NEW_ZK_SESSION, String.valueOf(handleNewSession));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3275,6 +3284,7 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3341,6 +3351,7 @@ public void testTokensNotClaimedForConnectorThatFailedToStop() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, "100");
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS, "10");
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3419,6 +3430,7 @@ public void testLeaderDoesNotPollForTokensIfFeatureIsDisabled() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(false));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3452,6 +3464,7 @@ public void testLeaderPollsForTokensAndRevokesThemIfTheyAreUnclaimed() throws Ex
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3503,6 +3516,7 @@ public void testLeaderPollsForTokensAndMarksTheDatastreamStoppedIfTheyAreClaimed
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -4077,6 +4091,153 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
});
}

@Test
public void testSessionExpiryCallbackThreadAttemptingToAcquireCoordinatorObjectBeforeHandlingEvent() throws Exception {
String testCluster = "dummyCluster";
long testHeartbeatPeriod = Duration.ofSeconds(2).toMillis();

Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
properties.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
// custom heartbeat period of 2 second.
properties.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(testHeartbeatPeriod));

final Coordinator.CoordinatorEventProcessor[] testCoordinatorEventProcessor = {null};
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {
@Override
protected synchronized void createEventThread() {
testCoordinatorEventProcessor[0] = new CoordinatorEventProcessor() {
// Mimicking the coordinator's event thread's runnable method.
// 1. Sleeping before calling handleEvent to let zk session expiry
// thread acquire coordinator object before event thread enters
// handleEvent.
// 2. Handling a No-Op Event.
// 3. Notifying the zk session expiry threads to attempt acquiring the
// coordinator object.
@Override
public void run() {
// This flag will be enabled when an interrupt was called
// on the event thread while the event thread was sleeping.
boolean isInterruptedInSleep = false;
while (!isInterrupted()) {
try {
// Step 1
// Making sure we sleep for more than heartbeat period to
// mock the scenario where the zk session expiry thread
// acquires the coordinator object before the event thread does.
Thread.sleep(testHeartbeatPeriod + Duration.ofMillis(500).toMillis());
} catch (InterruptedException e) {
isInterruptedInSleep = true;
}
// Step 2
// Handling an event requires acquiring the coordinator's object.
handleEvent(new CoordinatorEvent(CoordinatorEvent.EventType.NO_OP, null));
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
}

@Override
CoordinatorEventProcessor getEventThread() {
return testCoordinatorEventProcessor[0];
}
});

coordinator.start();
ZkClient zkClient = new ZkClient(_zkConnectionString);

coordinator.onSessionExpired();
Assert.assertTrue(PollUtils.poll(coordinator::isZkSessionExpired, 100, testHeartbeatPeriod));
// Making sure we don't run into a deadlock scenario.
Assert.assertTrue(PollUtils.poll(() -> !coordinator.getEventThread().isAlive(), 100, testHeartbeatPeriod));

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();
}

@Test
public void testSessionExpiryCallbackThreadAttemptingToAcquireCoordinatorObjectAfterHandlingEvent() throws Exception {
String testCluster = "dummyCluster";
long testHeartbeatPeriod = Duration.ofSeconds(2).toMillis();

Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
properties.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
// custom heartbeat period of 2 second.
properties.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(testHeartbeatPeriod));

final Coordinator.CoordinatorEventProcessor[] testCoordinatorEventProcessor = {null};
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {
@Override
protected synchronized void createEventThread() {
testCoordinatorEventProcessor[0] = new CoordinatorEventProcessor() {
// Mimicking the coordinator's event thread's runnable method.
// 1. Handling a No-Op Event.
// 2. Sleeping after calling handleEvent to let zk session expiry
// thread wait for notification from event thread to access coordinator
// object.
// 3. Notifying the zk session expiry threads to attempt acquiring the
// coordinator object.
@Override
public void run() {
// This flag will be enabled when an interrupt was called
// on the event thread while the event thread was sleeping.
boolean isInterruptedInSleep = false;
// Step 1
// Handling an event requires acquiring the coordinator's object.
handleEvent(new CoordinatorEvent(CoordinatorEvent.EventType.NO_OP, null));
while (!isInterrupted()) {
try {
// Step 2
// Making sure we sleep for less than heartbeat period to
// mock the scenario where the zk session expiry thread
// is waiting for notification from the event thread.
Thread.sleep(testHeartbeatPeriod - Duration.ofMillis(500).toMillis());
} catch (InterruptedException e) {
isInterruptedInSleep = true;
}
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
}

@Override
CoordinatorEventProcessor getEventThread() {
return testCoordinatorEventProcessor[0];
}
});


coordinator.start();
ZkClient zkClient = new ZkClient(_zkConnectionString);

coordinator.onSessionExpired();
Assert.assertTrue(PollUtils.poll(coordinator::isZkSessionExpired, 100, testHeartbeatPeriod));
// Making sure we don't run into a deadlock scenario.
Assert.assertTrue(PollUtils.poll(() -> !coordinator.getEventThread().isAlive(), 100, testHeartbeatPeriod));

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();
}

// This helper function helps compare the requesting topics with the topics reflected in the server.
private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator,
Set<String> requestedThroughputViolatingTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,30 @@ public void start() {
_heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);
}

private synchronized void createEventThread() {
protected synchronized void createEventThread() {
_eventThread = new CoordinatorEventProcessor();
_eventThread.setDaemon(true);
}

private synchronized void startEventThread() {
if (!_shutdown) {
_eventThread.start();
CoordinatorEventProcessor eventThread = getEventThread();
eventThread.start();
}
}

private synchronized boolean stopEventThread() {
// interrupt the thread if it's not gracefully shutdown
while (_eventThread.isAlive()) {
CoordinatorEventProcessor eventThread = getEventThread();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are we not pointing at _eventThread instead of calling getEventThread() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an API makes it much better to test it with overrides instead of mocking the var, hence I changed the references to use the APIs to fetch the variable value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

while (eventThread.isAlive()) {
// Waits to acquire the Coordinator object for a maximum of _heartbeat period.
// The time bound waiting prevents the caller thread to not infinitely wait if
// the event thread is already shutdown.
waitForNotificationFromEventThread(_heartbeatPeriod);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we wait for (_heartbeatPeriod - some delta) ? This comment is valid only of this thread is responsible for punching heartbeats.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the coordinator thread is responsible for punching heartbeats. And we are never waiting on the coordinator thread. We are only waiting on any ZK callback threads or the main datastream server thread.
So, this should not be a problem.

try {
_eventThread.interrupt();
_eventThread.join(EVENT_THREAD_SHORT_JOIN_TIMEOUT);
_log.info("Attempting to interrupt the event thread.");
eventThread.interrupt();
eventThread.join(EVENT_THREAD_SHORT_JOIN_TIMEOUT);
} catch (InterruptedException e) {
_log.warn("Exception caught while interrupting the event thread", e);
return true;
Expand All @@ -346,16 +353,40 @@ private synchronized boolean stopEventThread() {
return false;
}

// Waiting for the event thread to die.
private synchronized boolean waitForEventThreadToJoin() {
CoordinatorEventProcessor eventThread = getEventThread();
if (!eventThread.isAlive()) {
return false;
}
// Waits to acquire the Coordinator object for a maximum of _heartbeat period.
// The time bound waiting prevents the caller thread to not infinitely wait if
// the event thread is already shutdown.
waitForNotificationFromEventThread(_heartbeatPeriod);
try {
_eventThread.join(EVENT_THREAD_LONG_JOIN_TIMEOUT);
_log.info("Waiting for {} milliseconds for the event thread to die.", EVENT_THREAD_LONG_JOIN_TIMEOUT);
eventThread.join(EVENT_THREAD_LONG_JOIN_TIMEOUT);
} catch (InterruptedException e) {
_log.warn("Exception caught while waiting the event thread to stop", e);
return true;
}
return false;
}

// Waits for a notification for specified duration from the event thread before acquiring the Coordinator object.
private synchronized void waitForNotificationFromEventThread(Duration duration) {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anyway to validate if the object lock is held when we enter this method (as we are calling wait here) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate that the wait releases the lock on the "this" object? Since its an intrinsic property of the wait function to release the lock on the object from where it is called, not sure if we need validation in code.

In the tests, it should be validating that. I have tried reproducing the deadlock scenario in one of the tests and ensuring no deadlock would mean that the object lock is released when the zk session expiry thread is waiting.

// This intrinsic conditional variable helps to halt threads (zk callback threads, main server thread) before
// attempting to acquire the Coordinator object. We never halt the event thread (coordinator thread)
// explicitly via this CV.
_log.info("Thread {} will wait for notification from the event thread for {} ms.",
Thread.currentThread().getName(), duration.toMillis());
this.wait(duration.toMillis());
} catch (InterruptedException exception) {
_log.warn("Exception caught while waiting for the notification from the event thread", exception);
}
}

/**
* Stop coordinator (and all connectors)
*/
Expand Down Expand Up @@ -2206,6 +2237,14 @@ private void populateDatastreamDestinationFromExistingDatastream(Datastream data
existingStream.getMetadata().get(DatastreamMetadataConstants.TASK_PREFIX));
}

// Via the intrinsic conditional variable, notify other threads that might
// be waiting on acquiring access on the Coordinator object.
// We are only calling notify on the synchronized Coordinator Object's ("this") waiting threads.
// Suppressing the Naked_Notify warning on this.
protected synchronized void notifyThreadsWaitingForCoordinatorObjectSynchronization() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this helper method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to use this from couple of tests and hence decided to keep it in a generic function which can be reused.

this.notifyAll();
}

@Override
public List<BrooklinMetricInfo> getMetricInfos() {
return _metrics.getMetricInfos();
Expand Down Expand Up @@ -2296,7 +2335,7 @@ CachedDatastreamReader getDatastreamCache() {
return _datastreamCache;
}

private class CoordinatorEventProcessor extends Thread {
protected class CoordinatorEventProcessor extends Thread {
@Override
public void run() {
_log.info("START CoordinatorEventProcessor thread");
Expand All @@ -2305,6 +2344,7 @@ public void run() {
CoordinatorEvent event = _eventQueue.take();
if (event != null) {
handleEvent(event);
notifyThreadsWaitingForCoordinatorObjectSynchronization();
}
} catch (InterruptedException e) {
_log.warn("CoordinatorEventProcessor interrupted", e);
Expand Down
Loading