From fa560e8bd16d183a56a0ad2b069c069609886c03 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 30 Oct 2023 14:54:50 -0400 Subject: [PATCH] Fix tests and test skaffolding Signed-off-by: Andriy Redko --- .../discovery/ClusterManagerDisruptionIT.java | 3 ++ .../StableClusterManagerDisruptionIT.java | 4 ++ .../common/util/concurrent/BaseFuture.java | 4 +- .../MockSinglePrioritizingExecutor.java | 39 +++++++++++-------- .../disruption/LongGCDisruptionTests.java | 8 ++++ 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java index 9aee6f7f7a192..9bd924de5d9c7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java @@ -58,7 +58,9 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; +import static org.junit.Assume.assumeThat; /** * Tests relating to the loss of the cluster-manager. @@ -71,6 +73,7 @@ public class ClusterManagerDisruptionIT extends AbstractDisruptionTestCase { */ public void testClusterManagerNodeGCs() throws Exception { List nodes = startCluster(3); + assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20"))); String oldClusterManagerNode = internalCluster().getClusterManagerName(); // a very long GC, but it's OK as we remove the disruption when it has had an effect diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/StableClusterManagerDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/StableClusterManagerDisruptionIT.java index c12718704e194..f6f2b2dbfd096 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/StableClusterManagerDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/StableClusterManagerDisruptionIT.java @@ -71,6 +71,8 @@ import static java.util.Collections.singleton; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assume.assumeThat; /** * Tests relating to the loss of the cluster-manager, but which work with the default fault detection settings which are rather lenient and will @@ -195,6 +197,8 @@ private void testFollowerCheckerAfterClusterManagerReelection(NetworkLinkDisrupt * following another elected cluster-manager node. These nodes should reject this cluster state and prevent them from following the stale cluster-manager. */ public void testStaleClusterManagerNotHijackingMajority() throws Exception { + assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20"))); + final List nodes = internalCluster().startNodes( 3, Settings.builder() diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java index 47fc4fc33bd74..a423e5572d981 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java @@ -74,7 +74,7 @@ public abstract class BaseFuture implements Future { * * @throws InterruptedException if the current thread was interrupted before * or during the call (optional but recommended). - * @throws CancellationException {@inheritDoc} + * @throws CancellationException if the computation was cancelled */ @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { @@ -96,7 +96,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutEx * * @throws InterruptedException if the current thread was interrupted before * or during the call (optional but recommended). - * @throws CancellationException {@inheritDoc} + * @throws CancellationException if the computation was cancelled */ @Override public V get() throws InterruptedException, ExecutionException { diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/MockSinglePrioritizingExecutor.java index 10a6ec791ec66..f9b78b26c170e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -44,25 +44,30 @@ public class MockSinglePrioritizingExecutor extends PrioritizedOpenSearchThreadPoolExecutor { public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { - super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() { - @Override - public void start() { - deterministicTaskQueue.scheduleNow(new Runnable() { - @Override - public void run() { - try { - r.run(); - } catch (KillWorkerError kwe) { - // hacks everywhere - } + super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> { + // This executor used to override Thread::start method so the actual runnable is + // being scheduled in the scope of current thread of execution. In JDK-19, the Thread::start + // is not called anymore (https://bugs.openjdk.org/browse/JDK-8292027) and there is no + // suitable option to alter the executor's behavior in the similar way. The closest we + // could get to is to schedule the runnable once the ThreadFactory is being asked to + // allocate the new thread. + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + try { + r.run(); + } catch (KillWorkerError kwe) { + // hacks everywhere } + } - @Override - public String toString() { - return r.toString(); - } - }); - } + @Override + public String toString() { + return r.toString(); + } + }); + + return new Thread(() -> {}); }, threadPool.getThreadContext(), threadPool.scheduler()); } diff --git a/test/framework/src/test/java/org/opensearch/test/disruption/LongGCDisruptionTests.java b/test/framework/src/test/java/org/opensearch/test/disruption/LongGCDisruptionTests.java index f545577ec5cc4..81292b22345a0 100644 --- a/test/framework/src/test/java/org/opensearch/test/disruption/LongGCDisruptionTests.java +++ b/test/framework/src/test/java/org/opensearch/test/disruption/LongGCDisruptionTests.java @@ -48,6 +48,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assume.assumeThat; public class LongGCDisruptionTests extends OpenSearchTestCase { @@ -65,6 +67,8 @@ public void executeLocked(Runnable r) { } public void testBlockingTimeout() throws Exception { + assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20"))); + final String nodeName = "test_node"; LongGCDisruption disruption = new LongGCDisruption(random(), nodeName) { @Override @@ -125,6 +129,8 @@ protected long getSuspendingTimeoutInMillis() { * but does keep retrying until all threads can be safely paused */ public void testNotBlockingUnsafeStackTraces() throws Exception { + assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20"))); + final String nodeName = "test_node"; LongGCDisruption disruption = new LongGCDisruption(random(), nodeName) { @Override @@ -179,6 +185,8 @@ protected Pattern[] getUnsafeClasses() { } public void testBlockDetection() throws Exception { + assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20"))); + final String disruptedNodeName = "disrupted_node"; final String blockedNodeName = "blocked_node"; CountDownLatch waitForBlockDetectionResult = new CountDownLatch(1);