Skip to content

Commit 7042e77

Browse files
srikanthpadakantiSrikanth Padakanti
andauthored
[Feature Request] Add support for a ForkJoinPool type (#19008)
* Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * [Feature Request] Add support for a ForkJoinPool type - Add changelog entry Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * [Feature Request] Add support for a ForkJoinPool type - Add changelog entry Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add support for a ForkJoinPool type - spotlessApply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Initial chekpoint Feature Request - Add support for a ForkJoinPool type Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add few more tests to cover the code Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add few more tests to cover the code Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Backward Compatibility : fallback to FIXED for unknown types Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Backward Compatibility : fallback to FIXED for unknown types Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Backward Compatibility : fallback to FIXED for unknown types Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Backward Compatibility : fallback to FIXED for unknown types Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Addressed the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Apply spotlesscheck Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Apply spotlesscheck and merge new changes to my main branch Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Remove the allowlist and the registration method. Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for the awaitTermination in threadpool Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for the awaitTermination in threadpool Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Addressed the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address the PR comments for Yaml tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address code review comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Resolve merge conflicts Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Introduce new settings in ForkJoinPoolExecutorBuilder Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * overload the constructor and keep the existing version for compatibility. Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Fix the precommit failure issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Fix the test failures for queue size Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Fix the test failures for queue size Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Added tests Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Added tests for the codecoverage target Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Changed te scope of buildTable and removed reflection Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Changed te scope of buildTable and removed reflection Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Changed the IT from plugin folder to single node IT with inline plugin registration Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Changed the IT from plugin folder to single node IT with inline plugin registration Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Changed the IT from plugin folder to single node IT with inline plugin registration Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Removed the build.gradle dependency opensearch-common Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Removed the build.gradle dependency opensearch-common Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> --------- Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> Signed-off-by: Srikanth Padakanti <srikanth29.9@gmail.com> Co-authored-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent b0fcdbe commit 7042e77

File tree

16 files changed

+1429
-128
lines changed

16 files changed

+1429
-128
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
9+
- Add support for a ForkJoinPool type ([#19008](https://github.com/opensearch-project/OpenSearch/pull/19008))
810
- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532))
911
- Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`.
1012
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))

qa/evil-tests/src/test/java/org/opensearch/threadpool/EvilThreadPoolTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.concurrent.ExecutorService;
4848
import java.util.concurrent.ScheduledThreadPoolExecutor;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.ForkJoinPool;
51+
5052
import java.util.concurrent.atomic.AtomicReference;
5153
import java.util.function.Consumer;
5254

@@ -70,6 +72,11 @@ public void tearDownThreadPool() {
7072

7173
public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
7274
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
75+
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
76+
// and is tested separately in testExecutionErrorOnForkJoinPool.
77+
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
78+
continue; // skip FORK_JOIN for these tests
79+
}
7380
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
7481
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
7582
checkExecutionError(getScheduleRunner(executor));
@@ -176,6 +183,11 @@ protected void doRun() {
176183

177184
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
178185
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
186+
// ForkJoinPool is skipped here because it does not support all ThreadPoolExecutor features or APIs,
187+
// and is tested separately in testExecutionErrorOnForkJoinPool.
188+
if (ThreadPool.THREAD_POOL_TYPES.get(executor) == ThreadPool.ThreadPoolType.FORK_JOIN) {
189+
continue; // skip FORK_JOIN for these tests
190+
}
179191
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), true);
180192

181193
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
@@ -391,4 +403,43 @@ private void runExecutionTest(
391403
}
392404
}
393405

406+
public void testExecutionExceptionOnForkJoinPool() throws InterruptedException {
407+
ForkJoinPool fjp = new ForkJoinPool();
408+
try {
409+
checkExecutionException(getExecuteRunner(fjp), true);
410+
checkExecutionException(getSubmitRunner(fjp), false);
411+
} finally {
412+
fjp.shutdownNow();
413+
fjp.awaitTermination(10, TimeUnit.SECONDS);
414+
}
415+
}
416+
417+
public void testExecutionErrorOnForkJoinPool() throws Exception {
418+
ForkJoinPool fjp = new ForkJoinPool(8);
419+
final CountDownLatch latch = new CountDownLatch(1);
420+
final AtomicReference<Throwable> thrown = new AtomicReference<>();
421+
try {
422+
fjp.execute(() -> {
423+
try {
424+
throw new Error("future error");
425+
} catch (Throwable t) {
426+
thrown.set(t);
427+
} finally {
428+
latch.countDown();
429+
}
430+
});
431+
432+
// Wait up to 5 seconds for the task to complete
433+
assertTrue("Timeout waiting for ForkJoinPool task", latch.await(5, TimeUnit.SECONDS));
434+
435+
Throwable error = thrown.get();
436+
assertNotNull("No error captured from ForkJoinPool task", error);
437+
assertTrue(error instanceof Error);
438+
assertEquals("future error", error.getMessage());
439+
} finally {
440+
fjp.shutdownNow();
441+
fjp.awaitTermination(10, TimeUnit.SECONDS);
442+
}
443+
}
444+
394445
}

rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -60,72 +60,72 @@
6060

6161
- match:
6262
$body: |
63-
/ #node_name name active queue rejected
64-
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
63+
/ #node_name name active queue rejected
64+
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
6565
6666
- do:
6767
cat.thread_pool:
68-
v: true
68+
v: true
6969

7070
- match:
71-
$body: |
72-
/^ node_name \s+ name \s+ active \s+ queue \s+ rejected \n
73-
(\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
71+
$body: |
72+
/^ node_name \s+ name \s+ active \s+ queue \s+ rejected \n
73+
(\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
7474
7575
- do:
7676
cat.thread_pool:
77-
h: pid,id,h,i,po
77+
h: pid,id,h,i,po
7878

7979
- match:
8080
$body: |
81-
/ #pid id host ip port
82-
(\d+ \s+ \S+ \s+ \S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ (\d+|-) \n)+ $/
81+
/ #pid id host ip port
82+
(\d+ \s+ \S+ \s+ \S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ (\d+|-) \n)+ $/
8383
8484
- do:
8585
cat.thread_pool:
86-
thread_pool_patterns: write,management,flush,generic,force_merge
87-
h: id,name,active
88-
v: true
86+
thread_pool_patterns: write,management,flush,generic,force_merge
87+
h: id,name,active
88+
v: true
8989

9090
- match:
9191
$body: |
92-
/^ id \s+ name \s+ active \n
93-
(\S+\s+ flush \s+ \d+ \n
94-
\S+\s+ force_merge \s+ \d+ \n
95-
\S+\s+ generic \s+ \d+ \n
96-
\S+\s+ management \s+ \d+ \n
97-
\S+\s+ write \s+ \d+ \n)+ $/
92+
/^ id \s+ name \s+ active \n
93+
(\S+\s+ flush \s+ \d+ \n
94+
\S+\s+ force_merge \s+ \d+ \n
95+
\S+\s+ generic \s+ \d+ \n
96+
\S+\s+ management \s+ \d+ \n
97+
\S+\s+ write \s+ \d+ \n)+ $/
9898
9999
- do:
100100
cat.thread_pool:
101-
thread_pool_patterns: write
102-
h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive
103-
v: true
101+
thread_pool_patterns: write
102+
h: id,name,type,active,size,queue,queue_size,rejected,largest,completed,min,max,keep_alive
103+
v: true
104104

105105
- match:
106106
$body: |
107-
/^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ max \s+ keep_alive \n
108-
(\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \S* \n)+ $/
107+
/^ id \s+ name \s+ type \s+ active \s+ size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ max \s+ keep_alive \n
108+
(\S+ \s+ write \s+ fixed \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \S* \n)+ $/
109109
110110
- do:
111111
cat.thread_pool:
112-
thread_pool_patterns: fetch*
113-
h: id,name,type,active,pool_size,queue,queue_size,rejected,largest,completed,core,max,size,keep_alive
114-
v: true
112+
thread_pool_patterns: fetch*
113+
h: id,name,type,active,pool_size,queue,queue_size,rejected,largest,completed,core,max,size,keep_alive
114+
v: true
115115

116116
- match:
117117
$body: |
118-
/^ id \s+ name \s+ type \s+ active \s+ pool_size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ core \s+ max \s+ size \s+ keep_alive \n
119-
(\S+ \s+ fetch_shard_started \s+ scaling \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n
120-
\S+ \s+ fetch_shard_store \s+ scaling \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n)+ $/
118+
/^ id \s+ name \s+ type \s+ active \s+ pool_size \s+ queue \s+ queue_size \s+ rejected \s+ largest \s+ completed \s+ core \s+ max \s+ size \s+ keep_alive \n
119+
(\S+ \s+ fetch_shard_started \s+ scaling \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n
120+
\S+ \s+ fetch_shard_store \s+ scaling \s+ \d+ \s+ \d+ \s+ \d+ \s+ (-1|\d+) \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \d* \s+ \S* \n)+ $/
121121
122122
- do:
123123
cat.thread_pool:
124-
thread_pool_patterns: write,search
125-
size: ""
124+
thread_pool_patterns: write,search
125+
size: ""
126126

127127
- match:
128128
$body: |
129-
/ #node_name name active queue rejected
130-
^ (\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n
131-
\S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
129+
/ #node_name name active queue rejected
130+
^ (\S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n
131+
\S+ \s+ write \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.threadpool;
10+
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.plugins.Plugin;
13+
import org.opensearch.test.OpenSearchSingleNodeTestCase;
14+
15+
import java.util.Collection;
16+
import java.util.List;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.ForkJoinPool;
19+
20+
/**
21+
* Single-node IT that defines an inline plugin to register a ForkJoin executor ("jvector")
22+
* and verifies it is available on the node.
23+
*/
24+
public class ForkJoinPoolIT extends OpenSearchSingleNodeTestCase {
25+
26+
/**
27+
* Inline test plugin that registers a ForkJoin-based executor named "jvector"
28+
* with a fixed parallelism of 9 for deterministic assertions.
29+
*/
30+
public static class TestPlugin extends Plugin {
31+
@Override
32+
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
33+
return List.of(new ForkJoinPoolExecutorBuilder("jvector", 9));
34+
}
35+
}
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> getPlugins() {
39+
// Load the inline plugin into the single-node cluster for this test
40+
return List.of(TestPlugin.class);
41+
}
42+
43+
public void testForkJoinPoolExists() {
44+
// Obtain the node's ThreadPool and verify the "jvector" executor
45+
ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
46+
ExecutorService executor = threadPool.executor("jvector");
47+
assertNotNull("jvector executor should be registered by the test plugin", executor);
48+
assertTrue("jvector should be a ForkJoinPool", executor instanceof ForkJoinPool);
49+
assertEquals("parallelism should be 9", 9, ((ForkJoinPool) executor).getParallelism());
50+
51+
// Also validate ThreadPool.Info reports FORK_JOIN with expected parallelism (max)
52+
ThreadPool.Info info = threadPool.info("jvector");
53+
assertNotNull("ThreadPool.Info for jvector should exist", info);
54+
assertEquals("jvector", info.getName());
55+
assertEquals("type must be FORK_JOIN", ThreadPool.ThreadPoolType.FORK_JOIN, info.getThreadPoolType());
56+
assertEquals("info.max should equal parallelism", 9, info.getMax());
57+
}
58+
}

0 commit comments

Comments
 (0)