Skip to content

Commit 7451fa2

Browse files
committed
Release operation permit on thread-pool rejection (#25930)
At the shard level we use an operation permit to coordinate between regular shard operations and special operations that need exclusive access. In ES versions < 6, the operation requiring exclusive access was invoked during primary relocation, but ES versions >= 6 this exclusive access is also used when a replica learns about a new primary or when a replica is promoted to primary. These special operations requiring exclusive access delay regular operations from running, by adding them to a queue, and after finishing the exclusive access, release these operations which then need to be put back on the original thread-pool they were running on. In the presence of thread pool rejections, the current implementation had two issues: - it would not properly release the operation permit when hitting a rejection (i.e. when calling ThreadedActionListener.onResponse from IndexShardOperationPermits.acquire). - it would not invoke the onFailure method of the action listener when the shard was closed, and just log a warning instead (see ThreadedActionListener.onFailure), which would ultimately lead to the replication task never being cleaned up (see #25863). This commit fixes both issues by introducing a custom threaded action listener that is permit-aware and properly deals with rejections. Closes #25863
1 parent 4517515 commit 7451fa2

File tree

3 files changed

+119
-16
lines changed

3 files changed

+119
-16
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
package org.elasticsearch.index.shard;
2020

2121
import org.apache.logging.log4j.Logger;
22+
import org.apache.lucene.util.IOUtils;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.support.ContextPreservingActionListener;
2425
import org.elasticsearch.action.support.ThreadedActionListener;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.lease.Releasable;
28+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2729
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
2830
import org.elasticsearch.threadpool.ThreadPool;
2931

@@ -106,14 +108,17 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)
106108
}
107109

108110
/**
109-
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided
110-
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock
111-
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations
112-
* terminates.
111+
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
112+
* {@link ActionListener} will be called on the calling thread. During calls of
113+
* {@link #blockOperations(long, TimeUnit, Runnable)}, permit acquisition can be delayed.
114+
* The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
115+
* longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
116+
* directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure
117+
* that the {@link ActionListener#onFailure(Exception)} method provided here only contains lightweight operations.
113118
*
114-
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed
115-
* @param executorOnDelay executor to use for delayed call
116-
* @param forceExecution whether the runnable should force its execution in case it gets rejected
119+
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
120+
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
121+
* @param forceExecution whether the runnable should force its execution in case it gets rejected
117122
*/
118123
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
119124
if (closed) {
@@ -131,8 +136,7 @@ public void acquire(ActionListener<Releasable> onAcquired, String executorOnDela
131136
}
132137
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
133138
if (executorOnDelay != null) {
134-
delayedOperations.add(
135-
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
139+
delayedOperations.add(new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
136140
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
137141
} else {
138142
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
@@ -168,4 +172,56 @@ public int getActiveOperationsCount() {
168172
return TOTAL_PERMITS - availablePermits;
169173
}
170174
}
175+
176+
/**
177+
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
178+
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
179+
* invoker's thread to communicate failures.
180+
*/
181+
private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {
182+
183+
private final ThreadPool threadPool;
184+
private final String executor;
185+
private final ActionListener<Releasable> listener;
186+
private final boolean forceExecution;
187+
188+
private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
189+
boolean forceExecution) {
190+
this.threadPool = threadPool;
191+
this.executor = executor;
192+
this.listener = listener;
193+
this.forceExecution = forceExecution;
194+
}
195+
196+
@Override
197+
public void onResponse(final Releasable releasable) {
198+
threadPool.executor(executor).execute(new AbstractRunnable() {
199+
@Override
200+
public boolean isForceExecution() {
201+
return forceExecution;
202+
}
203+
204+
@Override
205+
protected void doRun() throws Exception {
206+
listener.onResponse(releasable);
207+
}
208+
209+
@Override
210+
public void onRejection(Exception e) {
211+
IOUtils.closeWhileHandlingException(releasable);
212+
super.onRejection(e);
213+
}
214+
215+
@Override
216+
public void onFailure(Exception e) {
217+
listener.onFailure(e); // will possibly execute on the caller thread
218+
}
219+
});
220+
}
221+
222+
@Override
223+
public void onFailure(final Exception e) {
224+
listener.onFailure(e);
225+
}
226+
}
171227
}

core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.action.support.PlainActionFuture;
2323
import org.elasticsearch.common.lease.Releasable;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
26+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2427
import org.elasticsearch.common.util.concurrent.ThreadContext;
2528
import org.elasticsearch.test.ESTestCase;
2629
import org.elasticsearch.threadpool.TestThreadPool;
@@ -39,6 +42,7 @@
3942
import java.util.function.Function;
4043
import java.util.function.Supplier;
4144

45+
import static org.hamcrest.Matchers.either;
4246
import static org.hamcrest.Matchers.equalTo;
4347
import static org.hamcrest.Matchers.instanceOf;
4448

@@ -50,7 +54,18 @@ public class IndexShardOperationsLockTests extends ESTestCase {
5054

5155
@BeforeClass
5256
public static void setupThreadPool() {
53-
threadPool = new TestThreadPool("IndexShardOperationsLockTests");
57+
int bulkThreadPoolSize = randomIntBetween(1, 2);
58+
int bulkThreadPoolQueueSize = randomIntBetween(1, 2);
59+
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
60+
Settings.builder()
61+
.put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize)
62+
.put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize)
63+
.build());
64+
assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class));
65+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize));
66+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize));
67+
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(),
68+
equalTo(bulkThreadPoolQueueSize));
5469
}
5570

5671
@AfterClass
@@ -73,33 +88,53 @@ public void checkNoInflightOperations() {
7388
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
7489
int numThreads = 10;
7590

91+
class DummyException extends RuntimeException {}
92+
7693
List<PlainActionFuture<Releasable>> futures = new ArrayList<>();
7794
List<Thread> operationThreads = new ArrayList<>();
78-
CountDownLatch latch = new CountDownLatch(numThreads / 2);
95+
CountDownLatch latch = new CountDownLatch(numThreads / 4);
96+
boolean forceExecution = randomBoolean();
7997
for (int i = 0; i < numThreads; i++) {
98+
// the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool
99+
String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC);
100+
boolean failingListener = randomBoolean();
80101
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
81102
@Override
82103
public void onResponse(Releasable releasable) {
83104
releasable.close();
84-
super.onResponse(releasable);
105+
if (failingListener) {
106+
throw new DummyException();
107+
} else {
108+
super.onResponse(releasable);
109+
}
85110
}
86111
};
87112
Thread thread = new Thread() {
88113
public void run() {
89114
latch.countDown();
90-
block.acquire(future, ThreadPool.Names.GENERIC, true);
115+
try {
116+
block.acquire(future, threadPoolName, forceExecution);
117+
} catch (DummyException dummyException) {
118+
// ok, notify future
119+
assertTrue(failingListener);
120+
future.onFailure(dummyException);
121+
}
91122
}
92123
};
93124
futures.add(future);
94125
operationThreads.add(thread);
95126
}
96127

128+
boolean closeAfterBlocking = randomBoolean();
97129
CountDownLatch blockFinished = new CountDownLatch(1);
98130
threadPool.generic().execute(() -> {
99131
try {
100132
latch.await();
101133
blockAndWait().close();
102134
blockFinished.countDown();
135+
if (closeAfterBlocking) {
136+
block.close();
137+
}
103138
} catch (InterruptedException e) {
104139
throw new RuntimeException(e);
105140
}
@@ -110,7 +145,16 @@ public void run() {
110145
}
111146

112147
for (PlainActionFuture<Releasable> future : futures) {
113-
assertNotNull(future.get(1, TimeUnit.MINUTES));
148+
try {
149+
assertNotNull(future.get(1, TimeUnit.MINUTES));
150+
} catch (ExecutionException e) {
151+
if (closeAfterBlocking) {
152+
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class))
153+
.or(instanceOf(IndexShardClosedException.class)));
154+
} else {
155+
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class)));
156+
}
157+
}
114158
}
115159

116160
for (Thread thread : operationThreads) {
@@ -120,7 +164,6 @@ public void run() {
120164
blockFinished.await();
121165
}
122166

123-
124167
public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException {
125168
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
126169
block.acquire(future, ThreadPool.Names.GENERIC, true);

test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
public class TestThreadPool extends ThreadPool {
2626

2727
public TestThreadPool(String name) {
28-
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build());
28+
this(name, Settings.EMPTY);
29+
}
30+
31+
public TestThreadPool(String name, Settings settings) {
32+
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
2933
}
3034

3135
}

0 commit comments

Comments
 (0)