Skip to content

Commit 44a1071

Browse files
authored
Make recovery source partially non-blocking (#37291)
Today a peer-recovery may run into a deadlock if the value of node_concurrent_recoveries is too high. This happens because the peer-recovery is executed in a blocking fashion. This commit attempts to make the recovery source partially non-blocking. I will make three follow-ups to make it fully non-blocking: (1) send translog operations, (2) primary relocation, (3) send commit files. Relates #36195
1 parent 63fe3c6 commit 44a1071

File tree

7 files changed

+149
-77
lines changed

7 files changed

+149
-77
lines changed

server/src/main/java/org/elasticsearch/common/util/CancellableThreads.java

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.common.util;
2020

21+
import org.apache.lucene.util.SetOnce;
2122
import org.apache.lucene.util.ThreadInterruptedException;
2223
import org.elasticsearch.ElasticsearchException;
2324
import org.elasticsearch.common.Nullable;
@@ -38,34 +39,35 @@ public class CancellableThreads {
3839
private final Set<Thread> threads = new HashSet<>();
3940
// needs to be volatile as it is also read outside of synchronized blocks.
4041
private volatile boolean cancelled = false;
42+
private final SetOnce<OnCancel> onCancel = new SetOnce<>();
4143
private String reason;
4244

4345
public synchronized boolean isCancelled() {
4446
return cancelled;
4547
}
4648

47-
48-
/** call this will throw an exception if operation was cancelled.
49-
* Override {@link #onCancel(String, Exception)} for custom failure logic */
50-
public synchronized void checkForCancel() {
51-
if (isCancelled()) {
52-
onCancel(reason, null);
53-
}
49+
public void checkForCancel() {
50+
checkForCancel(null);
5451
}
5552

56-
/**
57-
* called if {@link #checkForCancel()} was invoked after the operation was cancelled.
58-
* the default implementation always throws an {@link ExecutionCancelledException}, suppressing
59-
* any other exception that occurred before cancellation
60-
* @param reason reason for failure supplied by the caller of {@link #cancel}
61-
* @param suppressedException any error that was encountered during the execution before the operation was cancelled.
62-
*/
63-
protected void onCancel(String reason, @Nullable Exception suppressedException) {
64-
RuntimeException e = new ExecutionCancelledException("operation was cancelled reason [" + reason + "]");
65-
if (suppressedException != null) {
66-
e.addSuppressed(suppressedException);
53+
private void checkForCancel(Exception beforeCancelException) {
54+
if (isCancelled()) {
55+
final String reason;
56+
final OnCancel onCancel;
57+
synchronized (this) {
58+
reason = this.reason;
59+
onCancel = this.onCancel.get();
60+
}
61+
if (onCancel != null) {
62+
onCancel.onCancel(reason, beforeCancelException);
63+
}
64+
// fallback to the default exception
65+
final RuntimeException cancelExp = new ExecutionCancelledException("operation was cancelled reason [" + reason + "]");
66+
if (beforeCancelException != null) {
67+
cancelExp.addSuppressed(beforeCancelException);
68+
}
69+
throw cancelExp;
6770
}
68-
throw e;
6971
}
7072

7173
private synchronized boolean add() {
@@ -125,17 +127,14 @@ public void executeIO(IOInterruptible interruptible) throws IOException {
125127
// clear the flag interrupted flag as we are checking for failure..
126128
Thread.interrupted();
127129
}
128-
synchronized (this) {
129-
if (isCancelled()) {
130-
onCancel(reason, ioException != null ? ioException : runtimeException);
131-
} else if (ioException != null) {
132-
// if we're not canceling, we throw the original exception
133-
throw ioException;
134-
}
135-
if (runtimeException != null) {
136-
// if we're not canceling, we throw the original exception
137-
throw runtimeException;
138-
}
130+
checkForCancel(ioException != null ? ioException : runtimeException);
131+
if (ioException != null) {
132+
// if we're not canceling, we throw the original exception
133+
throw ioException;
134+
}
135+
if (runtimeException != null) {
136+
// if we're not canceling, we throw the original exception
137+
throw runtimeException;
139138
}
140139
if (cancelledByExternalInterrupt) {
141140
// restore interrupt flag to at least adhere to expected behavior
@@ -185,4 +184,26 @@ public ExecutionCancelledException(StreamInput in) throws IOException {
185184
super(in);
186185
}
187186
}
187+
188+
/**
189+
* Registers a callback that will be invoked when some running operations are cancelled or {@link #checkForCancel()} is called.
190+
*/
191+
public synchronized void setOnCancel(OnCancel onCancel) {
192+
this.onCancel.set(onCancel);
193+
}
194+
195+
@FunctionalInterface
196+
public interface OnCancel {
197+
/**
198+
* Called when some running operations are cancelled or {@link #checkForCancel()} is explicitly called.
199+
* If this method throws an exception, cancelling tasks will fail with that exception; otherwise they
200+
* will fail with the default exception {@link ExecutionCancelledException}.
201+
*
202+
* @param reason the reason of the cancellation
203+
* @param beforeCancelException any error that was encountered during the execution before the operations were cancelled.
204+
* @see #checkForCancel()
205+
* @see #setOnCancel(OnCancel)
206+
*/
207+
void onCancel(String reason, @Nullable Exception beforeCancelException);
208+
}
188209
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.ExceptionsHelper;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.action.support.HandledTransportAction;
2527
import org.elasticsearch.cluster.routing.ShardRouting;
2628
import org.elasticsearch.common.Nullable;
2729
import org.elasticsearch.common.inject.Inject;
@@ -81,7 +83,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
8183
}
8284
}
8385

84-
private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException {
86+
private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) throws IOException {
8587
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
8688
final IndexShard shard = indexService.getShard(request.shardId().id());
8789

@@ -101,18 +103,13 @@ private RecoveryResponse recover(final StartRecoveryRequest request) throws IOEx
101103
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
102104
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),
103105
request.targetNode());
104-
try {
105-
return handler.recoverToTarget();
106-
} finally {
107-
ongoingRecoveries.remove(shard, handler);
108-
}
106+
handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));
109107
}
110108

111109
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
112110
@Override
113111
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
114-
RecoveryResponse response = recover(request);
115-
channel.sendResponse(response);
112+
recover(request, new HandledTransportAction.ChannelActionListener<>(channel, Actions.START_RECOVERY, request));
116113
}
117114
}
118115

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import java.io.IOException;
2727
import java.util.List;
2828

29-
final class RecoveryResponse extends TransportResponse {
29+
public final class RecoveryResponse extends TransportResponse {
3030

3131
final List<String> phase1FileNames;
3232
final List<Long> phase1FileSizes;

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.action.ActionListener;
3535
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3636
import org.elasticsearch.cluster.routing.ShardRouting;
37-
import org.elasticsearch.common.Nullable;
3837
import org.elasticsearch.common.StopWatch;
3938
import org.elasticsearch.common.bytes.BytesArray;
4039
import org.elasticsearch.common.lease.Releasable;
@@ -70,6 +69,7 @@
7069
import java.util.List;
7170
import java.util.Locale;
7271
import java.util.concurrent.CompletableFuture;
72+
import java.util.concurrent.CopyOnWriteArrayList;
7373
import java.util.concurrent.atomic.AtomicLong;
7474
import java.util.function.Function;
7575
import java.util.function.Supplier;
@@ -96,22 +96,7 @@ public class RecoverySourceHandler {
9696
private final StartRecoveryRequest request;
9797
private final int chunkSizeInBytes;
9898
private final RecoveryTargetHandler recoveryTarget;
99-
100-
private final CancellableThreads cancellableThreads = new CancellableThreads() {
101-
@Override
102-
protected void onCancel(String reason, @Nullable Exception suppressedException) {
103-
RuntimeException e;
104-
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
105-
e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
106-
} else {
107-
e = new ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
108-
}
109-
if (suppressedException != null) {
110-
e.addSuppressed(suppressedException);
111-
}
112-
throw e;
113-
}
114-
};
99+
private final CancellableThreads cancellableThreads = new CancellableThreads();
115100

116101
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
117102
final StartRecoveryRequest request,
@@ -131,19 +116,37 @@ public StartRecoveryRequest getRequest() {
131116
/**
132117
* performs the recovery from the local engine to the target
133118
*/
134-
public RecoveryResponse recoverToTarget() throws IOException {
135-
runUnderPrimaryPermit(() -> {
136-
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
137-
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
138-
if (targetShardRouting == null) {
139-
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
140-
request.targetNode());
141-
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
142-
}
143-
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
144-
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
145-
146-
try (Closeable ignored = shard.acquireRetentionLockForPeerRecovery()) {
119+
public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
120+
final List<Closeable> resources = new CopyOnWriteArrayList<>();
121+
final Closeable releaseResources = () -> IOUtils.close(resources);
122+
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
123+
try {
124+
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
125+
final RuntimeException e;
126+
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
127+
e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
128+
} else {
129+
e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
130+
}
131+
if (beforeCancelEx != null) {
132+
e.addSuppressed(beforeCancelEx);
133+
}
134+
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
135+
throw e;
136+
});
137+
runUnderPrimaryPermit(() -> {
138+
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
139+
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
140+
if (targetShardRouting == null) {
141+
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
142+
request.targetNode());
143+
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
144+
}
145+
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
146+
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
147+
shard, cancellableThreads, logger);
148+
final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery();
149+
resources.add(retentionLock);
147150
final long startingSeqNo;
148151
final long requiredSeqNoRangeStart;
149152
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
@@ -217,6 +220,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
217220
}
218221
final SendSnapshotResult sendSnapshotResult;
219222
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
223+
// we can release the retention lock here because the snapshot itself will retain the required operations.
224+
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
220225
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
221226
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
222227
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
@@ -229,10 +234,16 @@ public RecoveryResponse recoverToTarget() throws IOException {
229234

230235
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
231236
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
232-
return new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
233-
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
234-
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
235-
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
237+
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
238+
IOUtils.close(resources);
239+
wrappedListener.onResponse(
240+
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
241+
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
242+
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
243+
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
244+
);
245+
} catch (Exception e) {
246+
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
236247
}
237248
}
238249

server/src/test/java/org/elasticsearch/common/util/CancellableThreadsTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import org.hamcrest.Matchers;
2525

2626
import java.io.IOException;
27+
import java.util.Arrays;
2728
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import static org.elasticsearch.common.util.CancellableThreads.ExecutionCancelledException;
32+
import static org.hamcrest.Matchers.equalTo;
2833

2934
public class CancellableThreadsTests extends ESTestCase {
3035
public static class CustomException extends RuntimeException {
@@ -39,6 +44,8 @@ public IOCustomException(String msg) {
3944
}
4045
}
4146

47+
static class ThrowOnCancelException extends RuntimeException {
48+
}
4249

4350
private class TestPlan {
4451
public final int id;
@@ -167,6 +174,19 @@ public void testCancellableThreads() throws InterruptedException {
167174
}
168175

169176
readyForCancel.await();
177+
final boolean throwInOnCancel = randomBoolean();
178+
final AtomicInteger invokeTimes = new AtomicInteger();
179+
cancellableThreads.setOnCancel((reason, beforeCancelException) -> {
180+
invokeTimes.getAndIncrement();
181+
if (throwInOnCancel) {
182+
ThrowOnCancelException e = new ThrowOnCancelException();
183+
if (beforeCancelException != null) {
184+
e.addSuppressed(beforeCancelException);
185+
}
186+
throw e;
187+
}
188+
});
189+
170190
cancellableThreads.cancel("test");
171191
for (Thread thread : threads) {
172192
thread.join(20000);
@@ -181,7 +201,11 @@ public void testCancellableThreads() throws InterruptedException {
181201
assertNull(exceptions[i]);
182202
} else {
183203
// in all other cases, we expect a cancellation exception.
184-
assertThat(exceptions[i], Matchers.instanceOf(CancellableThreads.ExecutionCancelledException.class));
204+
if (throwInOnCancel) {
205+
assertThat(exceptions[i], Matchers.instanceOf(ThrowOnCancelException.class));
206+
} else {
207+
assertThat(exceptions[i], Matchers.instanceOf(ExecutionCancelledException.class));
208+
}
185209
if (plan.exceptAfterCancel) {
186210
assertThat(exceptions[i].getSuppressed(),
187211
Matchers.arrayContaining(
@@ -191,8 +215,17 @@ public void testCancellableThreads() throws InterruptedException {
191215
assertThat(exceptions[i].getSuppressed(), Matchers.emptyArray());
192216
}
193217
}
194-
assertThat(interrupted[plan.id], Matchers.equalTo(plan.presetInterrupt));
218+
assertThat(interrupted[plan.id], equalTo(plan.presetInterrupt));
219+
}
220+
assertThat(invokeTimes.longValue(),
221+
equalTo(Arrays.stream(plans).filter(p -> p.exceptBeforeCancel == false && p.exitBeforeCancel == false).count()));
222+
if (throwInOnCancel) {
223+
expectThrows(ThrowOnCancelException.class, cancellableThreads::checkForCancel);
224+
} else {
225+
expectThrows(ExecutionCancelledException.class, cancellableThreads::checkForCancel);
195226
}
227+
assertThat(invokeTimes.longValue(),
228+
equalTo(Arrays.stream(plans).filter(p -> p.exceptBeforeCancel == false && p.exitBeforeCancel == false).count() + 1));
196229
}
197230

198231
}

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.ExceptionsHelper;
3636
import org.elasticsearch.Version;
3737
import org.elasticsearch.action.ActionListener;
38+
import org.elasticsearch.action.support.PlainActionFuture;
3839
import org.elasticsearch.cluster.metadata.IndexMetaData;
3940
import org.elasticsearch.cluster.node.DiscoveryNode;
4041
import org.elasticsearch.common.UUIDs;
@@ -433,7 +434,11 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
433434
}
434435

435436
};
436-
expectThrows(IndexShardRelocatedException.class, handler::recoverToTarget);
437+
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
438+
expectThrows(IndexShardRelocatedException.class, () -> {
439+
handler.recoverToTarget(future);
440+
future.actionGet();
441+
});
437442
assertFalse(phase1Called.get());
438443
assertFalse(prepareTargetForTranslogCalled.get());
439444
assertFalse(phase2Called.get());

0 commit comments

Comments
 (0)