Skip to content

Commit

Permalink
CURATOR-699. Upgrade ZooKeeper version to 3.9 (#496)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Feb 15, 2024
1 parent 2f1fc4e commit 972fffa
Show file tree
Hide file tree
Showing 17 changed files with 633 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
unittest:
name: Unit tests
runs-on: ubuntu-latest
timeout-minutes: 120
timeout-minutes: 180
strategy:
fail-fast: false
matrix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,8 @@ public class CuratorTransactionResult {
* @param forPath path
* @return predicate
*/
public static Predicate<CuratorTransactionResult> ofTypeAndPath(final OperationType type, final String forPath) {
return new Predicate<CuratorTransactionResult>() {
@Override
public boolean apply(CuratorTransactionResult result) {
return (result.getType() == type) && result.getForPath().equals(forPath);
}
};
public static Predicate<CuratorTransactionResult> ofTypeAndPath(OperationType type, String forPath) {
return result -> (result.getType() == type) && result.getForPath().equals(forPath);
}

public CuratorTransactionResult(OperationType type, String forPath, String resultPath, Stat resultStat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ public void testErrors() throws Exception {
CuratorOp createOp1 = client.transactionOp().create().forPath("/bar");
CuratorOp createOp2 = client.transactionOp().create().forPath("/z/blue");
final BlockingQueue<CuratorEvent> callbackQueue = new LinkedBlockingQueue<>();
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
callbackQueue.add(event);
}
};
BackgroundCallback callback = (client1, event) -> callbackQueue.add(event);
client.transaction().inBackground(callback).forOperations(createOp1, createOp2);
CuratorEvent event = callbackQueue.poll(new Timing().milliseconds(), TimeUnit.MILLISECONDS);
assertNotNull(event);
Expand Down Expand Up @@ -124,13 +119,13 @@ public void testWithNamespace() throws Exception {
Collection<CuratorTransactionResult> results =
client.transaction().forOperations(createOp1, createOp2, setDataOp, createOp3, deleteOp);

assertTrue(client.checkExists().forPath("/foo") != null);
assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null);
assertNotNull(client.checkExists().forPath("/foo"));
assertNotNull(client.usingNamespace(null).checkExists().forPath("/galt/foo"));
assertArrayEquals(client.getData().forPath("/foo"), "two".getBytes());
assertTrue(client.checkExists().forPath("/foo/bar") == null);
assertNull(client.checkExists().forPath("/foo/bar"));

CuratorTransactionResult ephemeralResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/test-"));
assertNotNull(ephemeralResult);
assertNotEquals(ephemeralResult.getResultPath(), "/test-");
assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));
Expand All @@ -149,13 +144,13 @@ public void testBasic() throws Exception {

Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp1, createOp2);

assertTrue(client.checkExists().forPath("/foo/bar") != null);
assertNotNull(client.checkExists().forPath("/foo/bar"));
assertArrayEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes());

CuratorTransactionResult fooResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo"));
CuratorTransactionResult fooBarResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
assertNotNull(fooResult);
assertNotNull(fooBarResult);
assertNotSame(fooResult, fooBarResult);
Expand All @@ -175,23 +170,18 @@ public void testBackground() throws Exception {
CuratorOp createOp2 = client.transactionOp().create().forPath("/foo/bar", "snafu".getBytes());

final BlockingQueue<List<CuratorTransactionResult>> queue = Queues.newLinkedBlockingQueue();
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
queue.add(event.getOpResults());
}
};
BackgroundCallback callback = (client1, event) -> queue.add(event.getOpResults());
client.transaction().inBackground(callback).forOperations(createOp1, createOp2);
Collection<CuratorTransactionResult> results = queue.poll(5, TimeUnit.SECONDS);

assertNotNull(results);
assertTrue(client.checkExists().forPath("/foo/bar") != null);
assertNotNull(client.checkExists().forPath("/foo/bar"));
assertArrayEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes());

CuratorTransactionResult fooResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo"));
CuratorTransactionResult fooBarResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
assertNotNull(fooResult);
assertNotNull(fooBarResult);
assertNotSame(fooResult, fooBarResult);
Expand Down Expand Up @@ -221,26 +211,21 @@ public void testBackgroundWithNamespace() throws Exception {
CuratorOp deleteOp = client.transactionOp().delete().forPath("/foo/bar");

final BlockingQueue<List<CuratorTransactionResult>> queue = Queues.newLinkedBlockingQueue();
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
queue.add(event.getOpResults());
}
};
BackgroundCallback callback = (client1, event) -> queue.add(event.getOpResults());
client.transaction()
.inBackground(callback)
.forOperations(createOp1, createOp2, setDataOp, createOp3, deleteOp);

Collection<CuratorTransactionResult> results = queue.poll(5, TimeUnit.SECONDS);

assertNotNull(results);
assertTrue(client.checkExists().forPath("/foo") != null);
assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null);
assertNotNull(client.checkExists().forPath("/foo"));
assertNotNull(client.usingNamespace(null).checkExists().forPath("/galt/foo"));
assertArrayEquals(client.getData().forPath("/foo"), "two".getBytes());
assertTrue(client.checkExists().forPath("/foo/bar") == null);
assertNull(client.checkExists().forPath("/foo/bar"));

CuratorTransactionResult ephemeralResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/test-"));
assertNotNull(ephemeralResult);
assertNotEquals(ephemeralResult.getResultPath(), "/test-");
assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public void testWithNamespace() throws Exception {
.and()
.commit();

assertTrue(client.checkExists().forPath("/foo") != null);
assertTrue(client.usingNamespace(null).checkExists().forPath("/galt/foo") != null);
assertNotNull(client.checkExists().forPath("/foo"));
assertNotNull(client.usingNamespace(null).checkExists().forPath("/galt/foo"));
assertArrayEquals(client.getData().forPath("/foo"), "two".getBytes());
assertTrue(client.checkExists().forPath("/foo/bar") == null);
assertNull(client.checkExists().forPath("/foo/bar"));

CuratorTransactionResult ephemeralResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/test-"));
assertNotNull(ephemeralResult);
assertNotEquals(ephemeralResult.getResultPath(), "/test-");
assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));
Expand Down Expand Up @@ -154,20 +154,20 @@ public void testWithCompression() throws Exception {
.and()
.commit();

assertTrue(client.checkExists().forPath("/foo") != null);
assertNotNull(client.checkExists().forPath("/foo"));
assertArrayEquals(client.getData().decompressed().forPath("/foo"), "five".getBytes());

assertTrue(client.checkExists().forPath("/bar") != null);
assertNotNull(client.checkExists().forPath("/bar"));
assertArrayEquals(client.getData().decompressed().forPath("/bar"), "two".getBytes());
assertEquals(client.getACL().forPath("/bar"), ZooDefs.Ids.READ_ACL_UNSAFE);

CuratorTransactionResult ephemeralResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/test-"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/test-"));
assertNotNull(ephemeralResult);
assertNotEquals(ephemeralResult.getResultPath(), "/test-");
assertTrue(ephemeralResult.getResultPath().startsWith("/test-"));

assertTrue(client.checkExists().forPath("/baz") != null);
assertNotNull(client.checkExists().forPath("/baz"));
assertArrayEquals(client.getData().decompressed().forPath("/baz"), "four".getBytes());
assertEquals(client.getACL().forPath("/baz"), ZooDefs.Ids.READ_ACL_UNSAFE);
} finally {
Expand All @@ -189,13 +189,13 @@ public void testBasic() throws Exception {
.and()
.commit();

assertTrue(client.checkExists().forPath("/foo/bar") != null);
assertNotNull(client.checkExists().forPath("/foo/bar"));
assertArrayEquals(client.getData().forPath("/foo/bar"), "snafu".getBytes());

CuratorTransactionResult fooResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo"));
CuratorTransactionResult fooBarResult =
Iterables.find(results, CuratorTransactionResult.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
Iterables.find(results, TransactionsHelper.ofTypeAndPath(OperationType.CREATE, "/foo/bar"));
assertNotNull(fooResult);
assertNotNull(fooBarResult);
assertNotSame(fooResult, fooBarResult);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.imps;

import com.google.common.base.Predicate;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.api.transaction.OperationType;

public class TransactionsHelper {
public static Predicate<CuratorTransactionResult> ofTypeAndPath(OperationType type, String forPath) {
return result -> (result.getType() == type) && result.getForPath().equals(forPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
package org.apache.curator.framework.recipes.leader;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.curator.test.Compatibility;
import org.apache.curator.test.TestingZooKeeperMain;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZooKeeperServer;
Expand Down Expand Up @@ -81,22 +79,19 @@ public void submitRequest(Request si) {
&& si.type != ZooDefs.OpCode.ping
&& firstError != 0
&& remaining > 0) {
log.debug("Rejected : " + si.toString());
log.debug("Rejected : {}", si);
// Still reject request
log.debug("Still not ready for " + remaining + "ms");
Compatibility.serverCnxnClose(si.cnxn);
return;
}
// Submit the request to the legacy Zookeeper server
log.debug("Applied : " + si.toString());
log.debug("Applied : {}", si);
super.submitRequest(si);
// Raise an error if a lock is created
if ((si.type == ZooDefs.OpCode.create) || (si.type == ZooDefs.OpCode.create2)) {
CreateRequest createRequest = new CreateRequest();
try {
ByteBuffer duplicate = si.request.duplicate();
duplicate.rewind();
ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
CreateRequest createRequest = si.readRequestRecord(CreateRequest::new);
if (createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX) && firstError == 0) {
firstError = System.currentTimeMillis();
// The znode has been created, close the connection and don't tell it to client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test cases designed after CURATOR-45
* Test cases designed after CURATOR-45.
*/
@Tag("master")
public class TestLeaderSelectorEdges extends BaseClassForTests {
private final Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -59,8 +61,6 @@ public static void resetCNXFactory() {
/**
* Create a LeaderSelector but close the connection right after the "lock" znode
* has been created.
*
* @throws Exception
*/
@Test
public void flappingTest() throws Exception {
Expand All @@ -81,7 +81,7 @@ public void flappingTest() throws Exception {
// At this point the ChaosMonkeyZookeeperServer must close the connection
// right after the lock znode is created.
assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
// Check that leader ship has failed
// Check that leadership has failed
assertEquals(listener.takeLeadership.getCount(), 1);
// Wait FailedDelete
Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
Expand Down
2 changes: 1 addition & 1 deletion curator-test-zk35/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@
<dependency>org.apache.curator:curator-recipes</dependency>
</dependenciesToScan>
<groups>zk35TestCompatibility</groups>
<excludedGroups>zk36,zk37</excludedGroups>
<excludedGroups>zk36</excludedGroups>
</configuration>
</plugin>
</plugins>
Expand Down
1 change: 0 additions & 1 deletion curator-test-zk36/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@
<dependency>org.apache.curator:curator-client</dependency>
</dependenciesToScan>
<groups>zk36,zk35TestCompatibility</groups>
<excludedGroups>zk37</excludedGroups>
</configuration>
</plugin>
</plugins>
Expand Down
Loading

0 comments on commit 972fffa

Please sign in to comment.