Skip to content

Commit b4f3669

Browse files
committed
Release operation permit on thread-pool rejection (#25930)
At the shard level we use an operation permit to coordinate between regular shard operations and special operations that need exclusive access. In ES versions < 6, the operation requiring exclusive access was invoked during primary relocation, but ES versions >= 6 this exclusive access is also used when a replica learns about a new primary or when a replica is promoted to primary. These special operations requiring exclusive access delay regular operations from running, by adding them to a queue, and after finishing the exclusive access, release these operations which then need to be put back on the original thread-pool they were running on. In the presence of thread pool rejections, the current implementation had two issues: - it would not properly release the operation permit when hitting a rejection (i.e. when calling ThreadedActionListener.onResponse from IndexShardOperationPermits.acquire). - it would not invoke the onFailure method of the action listener when the shard was closed, and just log a warning instead (see ThreadedActionListener.onFailure), which would ultimately lead to the replication task never being cleaned up (see #25863). This commit fixes both issues by introducing a custom threaded action listener that is permit-aware and properly deals with rejections. Closes #25863
1 parent 2288f37 commit b4f3669

File tree

2 files changed

+109
-10
lines changed

2 files changed

+109
-10
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
package org.elasticsearch.index.shard;
2121

2222
import org.apache.logging.log4j.Logger;
23+
import org.apache.lucene.util.IOUtils;
2324
import org.elasticsearch.Assertions;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.action.support.ContextPreservingActionListener;
26-
import org.elasticsearch.action.support.ThreadedActionListener;
2727
import org.elasticsearch.common.CheckedRunnable;
2828
import org.elasticsearch.common.lease.Releasable;
2929
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -198,11 +198,14 @@ private void releaseDelayedOperations() {
198198
/**
199199
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
200200
* {@link ActionListener} will be called on the calling thread. During calls of
201-
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
202-
* will then be called using the provided executor once operations are no longer blocked.
201+
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
202+
* The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
203+
* longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
204+
* directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure
205+
* that the {@link ActionListener#onFailure(Exception)} method provided here only contains lightweight operations.
203206
*
204207
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
205-
* @param executorOnDelay executor to use for delayed call
208+
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
206209
* @param forceExecution whether the runnable should force its execution in case it gets rejected
207210
*/
208211
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
@@ -217,7 +220,7 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
217220
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
218221
if (executorOnDelay != null) {
219222
delayedOperations.add(
220-
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
223+
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
221224
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
222225
} else {
223226
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
@@ -269,4 +272,56 @@ int getActiveOperationsCount() {
269272
}
270273
}
271274

275+
/**
276+
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
277+
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
278+
* invoker's thread to communicate failures.
279+
*/
280+
private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {
281+
282+
private final ThreadPool threadPool;
283+
private final String executor;
284+
private final ActionListener<Releasable> listener;
285+
private final boolean forceExecution;
286+
287+
private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
288+
boolean forceExecution) {
289+
this.threadPool = threadPool;
290+
this.executor = executor;
291+
this.listener = listener;
292+
this.forceExecution = forceExecution;
293+
}
294+
295+
@Override
296+
public void onResponse(final Releasable releasable) {
297+
threadPool.executor(executor).execute(new AbstractRunnable() {
298+
@Override
299+
public boolean isForceExecution() {
300+
return forceExecution;
301+
}
302+
303+
@Override
304+
protected void doRun() throws Exception {
305+
listener.onResponse(releasable);
306+
}
307+
308+
@Override
309+
public void onRejection(Exception e) {
310+
IOUtils.closeWhileHandlingException(releasable);
311+
super.onRejection(e);
312+
}
313+
314+
@Override
315+
public void onFailure(Exception e) {
316+
listener.onFailure(e); // will possibly execute on the caller thread
317+
}
318+
});
319+
}
320+
321+
@Override
322+
public void onFailure(final Exception e) {
323+
listener.onFailure(e);
324+
}
325+
}
326+
272327
}

core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.action.support.PlainActionFuture;
2323
import org.elasticsearch.common.lease.Releasable;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
26+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2427
import org.elasticsearch.common.util.concurrent.ThreadContext;
2528
import org.elasticsearch.test.ESTestCase;
2629
import org.elasticsearch.threadpool.TestThreadPool;
@@ -47,6 +50,7 @@
4750
import java.util.function.Function;
4851

4952
import static org.hamcrest.Matchers.containsString;
53+
import static org.hamcrest.Matchers.either;
5054
import static org.hamcrest.Matchers.equalTo;
5155
import static org.hamcrest.Matchers.hasToString;
5256
import static org.hamcrest.Matchers.instanceOf;
@@ -59,7 +63,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
5963

6064
@BeforeClass
6165
public static void setupThreadPool() {
62-
threadPool = new TestThreadPool("IndexShardOperationsLockTests");
66+
int bulkThreadPoolSize = randomIntBetween(1, 2);
67+
int bulkThreadPoolQueueSize = randomIntBetween(1, 2);
68+
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
69+
Settings.builder()
70+
.put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize)
71+
.put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize)
72+
.build());
73+
assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class));
74+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize));
75+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize));
76+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(),
77+
equalTo(bulkThreadPoolQueueSize));
6378
}
6479

6580
@AfterClass
@@ -82,33 +97,53 @@ public void checkNoInflightOperations() {
8297
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
8398
int numThreads = 10;
8499

100+
class DummyException extends RuntimeException {}
101+
85102
List<PlainActionFuture<Releasable>> futures = new ArrayList<>();
86103
List<Thread> operationThreads = new ArrayList<>();
87-
CountDownLatch latch = new CountDownLatch(numThreads / 2);
104+
CountDownLatch latch = new CountDownLatch(numThreads / 4);
105+
boolean forceExecution = randomBoolean();
88106
for (int i = 0; i < numThreads; i++) {
107+
// the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool
108+
String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC);
109+
boolean failingListener = randomBoolean();
89110
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
90111
@Override
91112
public void onResponse(Releasable releasable) {
92113
releasable.close();
93-
super.onResponse(releasable);
114+
if (failingListener) {
115+
throw new DummyException();
116+
} else {
117+
super.onResponse(releasable);
118+
}
94119
}
95120
};
96121
Thread thread = new Thread() {
97122
public void run() {
98123
latch.countDown();
99-
permits.acquire(future, ThreadPool.Names.GENERIC, true);
124+
try {
125+
permits.acquire(future, threadPoolName, forceExecution);
126+
} catch (DummyException dummyException) {
127+
// ok, notify future
128+
assertTrue(failingListener);
129+
future.onFailure(dummyException);
130+
}
100131
}
101132
};
102133
futures.add(future);
103134
operationThreads.add(thread);
104135
}
105136

137+
boolean closeAfterBlocking = randomBoolean();
106138
CountDownLatch blockFinished = new CountDownLatch(1);
107139
threadPool.generic().execute(() -> {
108140
try {
109141
latch.await();
110142
blockAndWait().close();
111143
blockFinished.countDown();
144+
if (closeAfterBlocking) {
145+
permits.close();
146+
}
112147
} catch (InterruptedException e) {
113148
throw new RuntimeException(e);
114149
}
@@ -119,7 +154,16 @@ public void run() {
119154
}
120155

121156
for (PlainActionFuture<Releasable> future : futures) {
122-
assertNotNull(future.get(1, TimeUnit.MINUTES));
157+
try {
158+
assertNotNull(future.get(1, TimeUnit.MINUTES));
159+
} catch (ExecutionException e) {
160+
if (closeAfterBlocking) {
161+
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class))
162+
.or(instanceOf(IndexShardClosedException.class)));
163+
} else {
164+
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class)));
165+
}
166+
}
123167
}
124168

125169
for (Thread thread : operationThreads) {

0 commit comments

Comments
 (0)