Skip to content

Make sure listener is resolved when file queue is cleared #89929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/89929.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89929
summary: Make sure listener is resolved when file queue is cleared
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2906,15 +2906,24 @@ private void doSnapshotShard(SnapshotShardContext context) {
allFilesUploadedListener.onResponse(Collections.emptyList());
return;
}
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, filesToSnapshot.size(), allFilesUploadedListener);
for (FileInfo fileInfo : filesToSnapshot) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, fileInfo, filesListener);
}
snapshotFiles(context, filesToSnapshot, allFilesUploadedListener);
} catch (Exception e) {
context.onFailure(e);
}
}

protected void snapshotFiles(
SnapshotShardContext context,
BlockingQueue<FileInfo> filesToSnapshot,
ActionListener<Collection<Void>> allFilesUploadedListener
) {
final int noOfFilesToSnapshot = filesToSnapshot.size();
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
for (int i = 0; i < noOfFilesToSnapshot; i++) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
}
}

private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
if (store.tryIncRef() == false) {
if (snapshotStatus.isAborted()) {
Expand Down Expand Up @@ -3415,7 +3424,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
* Snapshot individual file
* @param fileInfo file to snapshot
*/
private void snapshotFile(SnapshotShardContext context, FileInfo fileInfo) throws IOException {
protected void snapshotFile(SnapshotShardContext context, FileInfo fileInfo) throws IOException {
final IndexId indexId = context.indexId();
final Store store = context.store();
final ShardId shardId = store.shardId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;

Expand Down Expand Up @@ -75,18 +76,23 @@ public String toString() {
}

class FileSnapshotTask extends SnapshotTask {
private final FileInfo fileInfo;
private final Supplier<FileInfo> fileInfos;
private final ActionListener<Void> fileSnapshotListener;

FileSnapshotTask(SnapshotShardContext context, FileInfo fileInfo, ActionListener<Void> fileSnapshotListener) {
FileSnapshotTask(SnapshotShardContext context, Supplier<FileInfo> fileInfos, ActionListener<Void> fileSnapshotListener) {
super(context);
this.fileInfo = fileInfo;
this.fileInfos = fileInfos;
this.fileSnapshotListener = fileSnapshotListener;
}

@Override
public void run() {
ActionRunnable.run(fileSnapshotListener, () -> fileSnapshotter.accept(context, fileInfo)).run();
ActionRunnable.run(fileSnapshotListener, () -> {
FileInfo fileInfo = fileInfos.get();
if (fileInfo != null) {
fileSnapshotter.accept(context, fileInfo);
}
}).run();
}

@Override
Expand All @@ -96,14 +102,7 @@ public int priority() {

@Override
public String toString() {
return getClass().getSimpleName()
+ "{snapshotID=["
+ context.snapshotId()
+ "], indexID=["
+ context.indexId()
+ "], file=["
+ fileInfo.name()
+ "]}";
return getClass().getSimpleName() + "{snapshotID=[" + context.snapshotId() + "], indexID=[" + context.indexId() + "]}";
}
}

Expand All @@ -123,8 +122,12 @@ public void enqueueShardSnapshot(final SnapshotShardContext context) {
taskRunner.enqueueTask(task);
}

public void enqueueFileSnapshot(final SnapshotShardContext context, final FileInfo fileInfo, final ActionListener<Void> listener) {
final FileSnapshotTask task = new FileSnapshotTask(context, fileInfo, listener);
public void enqueueFileSnapshot(
final SnapshotShardContext context,
final Supplier<FileInfo> fileInfos,
final ActionListener<Void> listener
) {
final FileSnapshotTask task = new FileSnapshotTask(context, fileInfos, listener);
taskRunner.enqueueTask(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@
package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
Expand All @@ -32,6 +38,7 @@
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.SnapshotShardContext;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
Expand All @@ -40,12 +47,15 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -393,4 +403,55 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo
return repoData;
}

public void testEnsureUploadListenerIsResolvedWhenAFileSnapshotTaskFails() throws Exception {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final FsRepository repository = new FsRepository(
repositoryMetadata,
createEnvironment(),
xContentRegistry(),
clusterService,
MockBigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}

@Override
protected void snapshotFile(SnapshotShardContext context, BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
// Randomly fail some file snapshot tasks
if (randomBoolean()) {
throw new IOException("cannot upload file");
}
}
};
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
// Generate some FileInfo, as the files that get uploaded as part of the shard snapshot
SnapshotShardContext context = ShardSnapshotTaskRunnerTests.dummyContext();
int noOfFiles = randomIntBetween(10, 100);
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(noOfFiles);
PlainActionFuture<Void> listenerCalled = PlainActionFuture.newFuture();
ActionListener<Collection<Void>> allFilesUploadListener = ActionListener.wrap(() -> listenerCalled.onResponse(null));
for (int i = 0; i < noOfFiles; i++) {
files.add(ShardSnapshotTaskRunnerTests.dummyFileInfo());
}
repository.snapshotFiles(context, files, allFilesUploadListener);
listenerCalled.get();
}

private Environment createEnvironment() {
Path home = createTempDir();
return TestEnvironment.newEnvironment(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath())
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void snapshotShard(SnapshotShardContext context) {
filesToUpload
);
for (int i = 0; i < filesToUpload; i++) {
taskRunner.enqueueFileSnapshot(context, dummyFileInfo(), uploadListener);
taskRunner.enqueueFileSnapshot(context, ShardSnapshotTaskRunnerTests::dummyFileInfo, uploadListener);
}
}
finishedShardSnapshotTasks.incrementAndGet();
Expand All @@ -105,17 +105,17 @@ public int finishedShardSnapshotTasks() {
}
}

private static BlobStoreIndexShardSnapshot.FileInfo dummyFileInfo() {
public static BlobStoreIndexShardSnapshot.FileInfo dummyFileInfo() {
String filename = randomAlphaOfLength(10);
StoreFileMetadata metadata = new StoreFileMetadata(filename, 10, "CHECKSUM", Version.CURRENT.luceneVersion.toString());
return new BlobStoreIndexShardSnapshot.FileInfo(filename, metadata, null);
}

private SnapshotShardContext dummyContext() {
public static SnapshotShardContext dummyContext() {
return dummyContext(new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()), randomMillisUpToYear9999());
}

private SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) {
public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) {
IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 1);
Settings settings = Settings.builder()
Expand Down Expand Up @@ -177,17 +177,21 @@ public void testCompareToShardSnapshotTask() {
assertThat(workers.new ShardSnapshotTask(s2Context).compareTo(workers.new ShardSnapshotTask(s3Context)), equalTo(0));
// Shard snapshot task always has a higher priority over file snapshot
assertThat(
workers.new ShardSnapshotTask(s1Context).compareTo(workers.new FileSnapshotTask(s1Context, dummyFileInfo(), listener)),
workers.new ShardSnapshotTask(s1Context).compareTo(
workers.new FileSnapshotTask(s1Context, ShardSnapshotTaskRunnerTests::dummyFileInfo, listener)
),
lessThan(0)
);
assertThat(
workers.new ShardSnapshotTask(s2Context).compareTo(workers.new FileSnapshotTask(s1Context, dummyFileInfo(), listener)),
workers.new ShardSnapshotTask(s2Context).compareTo(
workers.new FileSnapshotTask(s1Context, ShardSnapshotTaskRunnerTests::dummyFileInfo, listener)
),
lessThan(0)
);
// File snapshots are prioritized by start time.
assertThat(
workers.new FileSnapshotTask(s1Context, dummyFileInfo(), listener).compareTo(
workers.new FileSnapshotTask(s2Context, dummyFileInfo(), listener)
workers.new FileSnapshotTask(s1Context, ShardSnapshotTaskRunnerTests::dummyFileInfo, listener).compareTo(
workers.new FileSnapshotTask(s2Context, ShardSnapshotTaskRunnerTests::dummyFileInfo, listener)
),
lessThan(0)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ public static String randomPositiveTimeValue() {
/**
* generate a random epoch millis in a range 1 to 9999-12-31T23:59:59.999
*/
public long randomMillisUpToYear9999() {
public static long randomMillisUpToYear9999() {
return randomLongBetween(1, DateUtils.MAX_MILLIS_BEFORE_9999);
}

Expand Down