Skip to content

Make recovery source partially non-blocking #37291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 12, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.common.util;

import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
Expand All @@ -38,34 +39,35 @@ public class CancellableThreads {
private final Set<Thread> threads = new HashSet<>();
// needs to be volatile as it is also read outside of synchronized blocks.
private volatile boolean cancelled = false;
private final SetOnce<OnCancel> onCancel = new SetOnce<>();
private String reason;

public synchronized boolean isCancelled() {
return cancelled;
}


/** call this will throw an exception if operation was cancelled.
* Override {@link #onCancel(String, Exception)} for custom failure logic */
public synchronized void checkForCancel() {
if (isCancelled()) {
onCancel(reason, null);
}
public void checkForCancel() {
checkForCancel(null);
}

/**
* called if {@link #checkForCancel()} was invoked after the operation was cancelled.
* the default implementation always throws an {@link ExecutionCancelledException}, suppressing
* any other exception that occurred before cancellation
* @param reason reason for failure supplied by the caller of {@link #cancel}
* @param suppressedException any error that was encountered during the execution before the operation was cancelled.
*/
protected void onCancel(String reason, @Nullable Exception suppressedException) {
RuntimeException e = new ExecutionCancelledException("operation was cancelled reason [" + reason + "]");
if (suppressedException != null) {
e.addSuppressed(suppressedException);
private void checkForCancel(Exception beforeCancelException) {
if (isCancelled()) {
final String reason;
final OnCancel onCancel;
synchronized (this) {
reason = this.reason;
onCancel = this.onCancel.get();
}
if (onCancel != null) {
onCancel.onCancel(reason, beforeCancelException);
}
// fallback to the default exception
final RuntimeException cancelExp = new ExecutionCancelledException("operation was cancelled reason [" + reason + "]");
if (beforeCancelException != null) {
cancelExp.addSuppressed(beforeCancelException);
}
throw cancelExp;
}
throw e;
}

private synchronized boolean add() {
Expand Down Expand Up @@ -125,17 +127,14 @@ public void executeIO(IOInterruptible interruptible) throws IOException {
// clear the flag interrupted flag as we are checking for failure..
Thread.interrupted();
}
synchronized (this) {
if (isCancelled()) {
onCancel(reason, ioException != null ? ioException : runtimeException);
} else if (ioException != null) {
// if we're not canceling, we throw the original exception
throw ioException;
}
if (runtimeException != null) {
// if we're not canceling, we throw the original exception
throw runtimeException;
}
checkForCancel(ioException != null ? ioException : runtimeException);
if (ioException != null) {
// if we're not canceling, we throw the original exception
throw ioException;
}
if (runtimeException != null) {
// if we're not canceling, we throw the original exception
throw runtimeException;
}
if (cancelledByExternalInterrupt) {
// restore interrupt flag to at least adhere to expected behavior
Expand Down Expand Up @@ -185,4 +184,26 @@ public ExecutionCancelledException(StreamInput in) throws IOException {
super(in);
}
}

/**
* Registers a callback that will be invoked when some running operations are cancelled or {@link #checkForCancel()} is called.
*/
public synchronized void setOnCancel(OnCancel onCancel) {
this.onCancel.set(onCancel);
}

@FunctionalInterface
public interface OnCancel {
/**
* Called when some running operations are cancelled or {@link #checkForCancel()} is explicitly called.
* If this method throws an exception, cancelling tasks will fail with that exception; otherwise they
* will fail with the default exception {@link ExecutionCancelledException}.
*
* @param reason the reason of the cancellation
* @param beforeCancelException any error that was encountered during the execution before the operations were cancelled.
* @see #checkForCancel()
* @see #setOnCancel(OnCancel)
*/
void onCancel(String reason, @Nullable Exception beforeCancelException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -81,7 +83,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException {
private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());

Expand All @@ -101,18 +103,13 @@ private RecoveryResponse recover(final StartRecoveryRequest request) throws IOEx
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),
request.targetNode());
try {
return handler.recoverToTarget();
} finally {
ongoingRecoveries.remove(shard, handler);
}
handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));
}

class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
recover(request, new HandledTransportAction.ChannelActionListener<>(channel, Actions.START_RECOVERY, request));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.io.IOException;
import java.util.List;

final class RecoveryResponse extends TransportResponse {
public final class RecoveryResponse extends TransportResponse {

final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lease.Releasable;
Expand Down Expand Up @@ -70,6 +69,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -96,22 +96,7 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;

private final CancellableThreads cancellableThreads = new CancellableThreads() {
@Override
protected void onCancel(String reason, @Nullable Exception suppressedException) {
RuntimeException e;
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
} else {
e = new ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
}
if (suppressedException != null) {
e.addSuppressed(suppressedException);
}
throw e;
}
};
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
Expand All @@ -131,19 +116,37 @@ public StartRecoveryRequest getRequest() {
/**
* performs the recovery from the local engine to the target
*/
public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);

try (Closeable ignored = shard.acquireRetentionLockForPeerRecovery()) {
public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final List<Closeable> resources = new CopyOnWriteArrayList<>();
final Closeable releaseResources = () -> IOUtils.close(resources);
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
try {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
final RuntimeException e;
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
} else {
e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
}
if (beforeCancelEx != null) {
e.addSuppressed(beforeCancelEx);
}
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery();
resources.add(retentionLock);
final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
Expand Down Expand Up @@ -217,6 +220,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
final SendSnapshotResult sendSnapshotResult;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
Expand All @@ -229,10 +234,16 @@ public RecoveryResponse recoverToTarget() throws IOException {

finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
return new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
IOUtils.close(resources);
wrappedListener.onResponse(
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.common.util.CancellableThreads.ExecutionCancelledException;
import static org.hamcrest.Matchers.equalTo;

public class CancellableThreadsTests extends ESTestCase {
public static class CustomException extends RuntimeException {
Expand All @@ -39,6 +44,8 @@ public IOCustomException(String msg) {
}
}

static class ThrowOnCancelException extends RuntimeException {
}

private class TestPlan {
public final int id;
Expand Down Expand Up @@ -167,6 +174,19 @@ public void testCancellableThreads() throws InterruptedException {
}

readyForCancel.await();
final boolean throwInOnCancel = randomBoolean();
final AtomicInteger invokeTimes = new AtomicInteger();
cancellableThreads.setOnCancel((reason, beforeCancelException) -> {
invokeTimes.getAndIncrement();
if (throwInOnCancel) {
ThrowOnCancelException e = new ThrowOnCancelException();
if (beforeCancelException != null) {
e.addSuppressed(beforeCancelException);
}
throw e;
}
});

cancellableThreads.cancel("test");
for (Thread thread : threads) {
thread.join(20000);
Expand All @@ -181,7 +201,11 @@ public void testCancellableThreads() throws InterruptedException {
assertNull(exceptions[i]);
} else {
// in all other cases, we expect a cancellation exception.
assertThat(exceptions[i], Matchers.instanceOf(CancellableThreads.ExecutionCancelledException.class));
if (throwInOnCancel) {
assertThat(exceptions[i], Matchers.instanceOf(ThrowOnCancelException.class));
} else {
assertThat(exceptions[i], Matchers.instanceOf(ExecutionCancelledException.class));
}
if (plan.exceptAfterCancel) {
assertThat(exceptions[i].getSuppressed(),
Matchers.arrayContaining(
Expand All @@ -191,8 +215,17 @@ public void testCancellableThreads() throws InterruptedException {
assertThat(exceptions[i].getSuppressed(), Matchers.emptyArray());
}
}
assertThat(interrupted[plan.id], Matchers.equalTo(plan.presetInterrupt));
assertThat(interrupted[plan.id], equalTo(plan.presetInterrupt));
}
assertThat(invokeTimes.longValue(),
equalTo(Arrays.stream(plans).filter(p -> p.exceptBeforeCancel == false && p.exitBeforeCancel == false).count()));
if (throwInOnCancel) {
expectThrows(ThrowOnCancelException.class, cancellableThreads::checkForCancel);
} else {
expectThrows(ExecutionCancelledException.class, cancellableThreads::checkForCancel);
}
assertThat(invokeTimes.longValue(),
equalTo(Arrays.stream(plans).filter(p -> p.exceptBeforeCancel == false && p.exitBeforeCancel == false).count() + 1));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -433,7 +434,11 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
}

};
expectThrows(IndexShardRelocatedException.class, handler::recoverToTarget);
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
expectThrows(IndexShardRelocatedException.class, () -> {
handler.recoverToTarget(future);
future.actionGet();
});
assertFalse(phase1Called.get());
assertFalse(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
Expand Down
Loading