Skip to content

Commit

Permalink
CURATOR-696. Fix double leader for LeaderLatch (#500)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Kezhu Wang <kezhuw@apache.org>
  • Loading branch information
tisonkun and kezhuw authored May 21, 2024
1 parent 82f2e53 commit 1027c2c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 33 deletions.
6 changes: 6 additions & 0 deletions curator-recipes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
getChildren();
}
} else {
log.error("getChildren() failed. rc = " + event.getResultCode());
log.error("getChildren() failed. rc = {}", event.getResultCode());
}
}
};
Expand Down Expand Up @@ -548,43 +548,57 @@ private void checkLeadership(List<String> children) throws Exception {
log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);

if (ourIndex < 0) {
log.error("Can't find our node. Resetting. Index: " + ourIndex);
log.error("Can't find our node. Resetting. Index: {}", ourIndex);
reset();
} else if (ourIndex == 0) {
lastPathIsLeader.set(localOurPath);
setLeadership(true);
} else {
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
try {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
return;
}

if (ourIndex == 0) {
client.getData()
.inBackground((client, event) -> {
final long ephemeralOwner =
event.getStat() != null ? event.getStat().getEphemeralOwner() : -1;
final long thisSessionId =
client.getZookeeperClient().getZooKeeper().getSessionId();
if (ephemeralOwner != thisSessionId) {
// this node is gone - reset
reset();
} else {
lastPathIsLeader.set(localOurPath);
setLeadership(true);
}
}
}
};
})
.forPath(localOurPath);
return;
}

BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
try {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData()
.usingWatcher(watcher)
.inBackground(callback)
.forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
};

BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
getChildren();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}

private void getChildren() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.curator.framework.recipes.leader;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -72,9 +73,12 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
public class TestLeaderLatch extends BaseClassForTests {
private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class);
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;

Expand Down Expand Up @@ -208,6 +212,58 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception {
}
}

@Test
public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception {
final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events0 = new LinkedBlockingQueue<>();
final BlockingQueue<TestEvent> events1 = new LinkedBlockingQueue<>();

final List<Closeable> closeableResources = new ArrayList<>();
try {
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0);
closeableResources.add(latch0);

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1);
closeableResources.add(latch1);

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset
// call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();

assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP));

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP));
// No leadership grained to old leader after session changed, hence no brain split.
assertThat(events0.poll(20, TimeUnit.MILLISECONDS))
.isNotEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));
} finally {
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

@Test
public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception {
final String latchPath = "/test";
Expand Down Expand Up @@ -316,7 +372,9 @@ private static CuratorFramework createAndStartClient(

client.getConnectionStateListenable().addListener((client1, newState) -> {
if (newState == ConnectionState.CONNECTED) {
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
if (events != null) {
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
}
}
});

Expand Down Expand Up @@ -366,6 +424,11 @@ public boolean equals(Object o) {
TestEvent testEvent = (TestEvent) o;
return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
}

@Override
public String toString() {
return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}';
}
}

@Test
Expand Down

0 comments on commit 1027c2c

Please sign in to comment.