Skip to content

Commit

Permalink
Unblock waiting threads sooner (#975)
Browse files Browse the repository at this point in the history
* Unblock waiting threads sooner

* Unblock waiting threads sooner

* Use explicit CV to wait and notifyall blocking threads

* Use explicit CV to wait and notifyall blocking threads

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add state varibale for intrinsic CV for notification

* Add comment

* Update version

* Update version

* Update comment

* Update test

* Update test
  • Loading branch information
hshukla authored Jan 19, 2024
1 parent 417d1f2 commit c7752df
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4134,12 +4134,12 @@ public void run() {
// Step 2
// Handling an event requires acquiring the coordinator's object.
handleEvent(new CoordinatorEvent(CoordinatorEvent.EventType.NO_OP, null));
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
Expand Down Expand Up @@ -4172,7 +4172,6 @@ public void testSessionExpiryCallbackThreadAttemptingToAcquireCoordinatorObjectA
Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
properties.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
// custom heartbeat period of 2 second.
properties.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(testHeartbeatPeriod));

final Coordinator.CoordinatorEventProcessor[] testCoordinatorEventProcessor = {null};
Expand Down Expand Up @@ -4207,12 +4206,12 @@ public void run() {
} catch (InterruptedException e) {
isInterruptedInSleep = true;
}
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware {
private final Map<String, SerdeAdmin> _serdeAdmins = new HashMap<>();
private final Map<String, Authorizer> _authorizers = new HashMap<>();
private volatile boolean _shutdown = false;
// TODO we have _shutdown, eventThread and now _coordinatorEventThreadExiting, for some distinct usage,
// we should revisit and refactor to have less variation
private volatile boolean _coordinatorEventThreadExiting = false;

private CoordinatorEventProcessor _eventThread;
private ScheduledExecutorService _scheduledExecutor;
Expand Down Expand Up @@ -377,12 +380,20 @@ private synchronized boolean waitForEventThreadToJoin() {
// Waits for a notification for specified duration from the event thread before acquiring the Coordinator object.
private synchronized void waitForNotificationFromEventThread(Duration duration) {
try {
// This intrinsic conditional variable helps to halt threads (zk callback threads, main server thread) before
// This intrinsic conditional variable(CV) helps to halt threads (zk callback threads, main server thread) before
// attempting to acquire the Coordinator object. We never halt the event thread (coordinator thread)
// explicitly via this CV.
_log.info("Thread {} will wait for notification from the event thread for {} ms.",
Thread.currentThread().getName(), duration.toMillis());
this.wait(duration.toMillis());

// The goal of this wait is to give eventThread a chance to acquire a lock on the coordinator object for the
// handleEvent. Ideally, we would use while loop here to avoid spurious signals, but in our case, we have a while
// loop which calls this method, so it is ok. Also, this if condition helps us to not continue go into wait mode,
// because it may cause shutdown to not run gracefully if the connecting deployment system has shorter timeouts
if (!_coordinatorEventThreadExiting) {
_log.info("Thread {} will wait for notification from the event thread for {} ms.",
Thread.currentThread().getName(), duration.toMillis());
this.wait(duration.toMillis());
}

} catch (InterruptedException exception) {
_log.warn("Exception caught while waiting for the notification from the event thread", exception);
}
Expand Down Expand Up @@ -2249,6 +2260,7 @@ private void populateDatastreamDestinationFromExistingDatastream(Datastream data
// We are only calling notify on the synchronized Coordinator Object's ("this") waiting threads.
// Suppressing the Naked_Notify warning on this.
protected synchronized void notifyThreadsWaitingForCoordinatorObjectSynchronization() {
_coordinatorEventThreadExiting = true;
this.notifyAll();
}

Expand Down Expand Up @@ -2351,7 +2363,6 @@ public void run() {
CoordinatorEvent event = _eventQueue.take();
if (event != null) {
handleEvent(event);
notifyThreadsWaitingForCoordinatorObjectSynchronization();
}
} catch (InterruptedException e) {
_log.warn("CoordinatorEventProcessor interrupted", e);
Expand All @@ -2360,6 +2371,10 @@ public void run() {
_log.error("CoordinatorEventProcessor failed", t);
}
}
synchronized (this) {
_coordinatorEventThreadExiting = true;
this.notifyAll();
}
_log.info("END CoordinatorEventProcessor");
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "5.5.0-SNAPSHOT"
version = "5.5.1-SNAPSHOT"
}

subprojects {
Expand Down

0 comments on commit c7752df

Please sign in to comment.