Skip to content

Simplify Blobstore Consistency Check in Tests (#73992) #74045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;

Expand Down Expand Up @@ -140,7 +139,7 @@ public void testSimpleWorkflow() {
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repo);
}

public void testMissingUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
logger.info("--> cleanup repository");
client().admin().cluster().prepareCleanupRepository(repoName).get();

BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
BlobStoreTestUtil.assertConsistency(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,11 @@ public void verifyReposThenStopServices() {

runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L));

BlobStoreTestUtil.assertConsistency(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
Runnable::run
final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
);
deterministicTaskQueue.runAllRunnableTasks();
assertNull(future.actionGet(0));
} finally {
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void testCleanup() throws Exception {
logger.info("--> deleting a snapshot to trigger repository cleanup");
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();

BlobStoreTestUtil.assertConsistency(repo, genericExec);
BlobStoreTestUtil.assertConsistency(repo);

logger.info("--> Create dangling index");
createDanglingIndex(repo, genericExec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayInputStream;
Expand All @@ -58,7 +56,6 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -82,23 +79,25 @@

public final class BlobStoreTestUtil {

public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
final BlobStoreRepository repo =
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}

/**
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
* @param repository BlobStoreRepository to check
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
* of this assertion must pass an executor on those when using such an implementation.
*/
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
executor.execute(ActionRunnable.supply(listener, () -> {
public static void assertConsistency(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
}

/**
* Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
*/
public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
try {
final BlobContainer blobContainer = repository.blobContainer();
final long latestGen;
Expand All @@ -117,15 +116,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
return null;
listener.onResponse(null);
} catch (AssertionError e) {
return e;
listener.onResponse(e);
}
}));
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
return future;
}

private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void assertRepoConsistency() {
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
clusterAdmin().prepareCleanupRepository(name).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
});
} else {
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,21 @@
import joptsimple.OptionSet;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.s3.S3RepositoryPlugin;

import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

public class S3CleanupTests extends AbstractCleanupTests {

@Override
protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
throws Exception {
assertBusy(() -> super.assertBlobsByPrefix(repository, path, prefix, blobs), 10, TimeUnit.MINUTES);
}

@Override
protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
assertBusy(() -> super.assertCorruptionVisible(repo, indexToFiles), 10, TimeUnit.MINUTES);
}

@Override
protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
assertBusy(() -> super.assertConsistency(repo, executor), 10, TimeUnit.MINUTES);
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(S3RepositoryPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Executor;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -38,19 +34,6 @@ public abstract class AbstractCleanupTests extends ESSingleNodeTestCase {

protected BlobStoreRepository repository;

protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
throws Exception {
BlobStoreTestUtil.assertBlobsByPrefix(repository, path, prefix, blobs);
}

protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
BlobStoreTestUtil.assertConsistency(repo, executor);
}

protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
BlobStoreTestUtil.assertCorruptionVisible(repo, indexToFiles);
}

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -64,7 +47,7 @@ private void cleanupRepository(BlobStoreRepository repository) throws Exception
repository.threadPool().generic().execute(ActionRunnable.run(future,
() -> repository.blobStore().blobContainer(repository.basePath()).delete()));
future.actionGet();
assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
}

@Override
Expand Down Expand Up @@ -191,7 +174,7 @@ public void testCleanup() throws Throwable {
assertThat(terminal.getOutput(), containsString("Set of deletion candidates is empty. Exiting"));

logger.info("--> check that there is no inconsistencies after running the tool");
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repository);

logger.info("--> create several dangling indices");
int numOfFiles = 0;
Expand All @@ -211,7 +194,7 @@ public void testCleanup() throws Throwable {
Set<String> danglingIndices = indexToFiles.keySet();

logger.info("--> ensure dangling index folders are visible");
assertCorruptionVisible(repository, indexToFiles);
BlobStoreTestUtil.assertCorruptionVisible(repository, indexToFiles);

logger.info("--> execute cleanup tool, corruption is created latter than snapshot, there is nothing to cleanup");
terminal = executeCommand(false);
Expand Down Expand Up @@ -258,7 +241,7 @@ public void testCleanup() throws Throwable {
containsString("Total bytes freed: " + size));

logger.info("--> verify that there is no inconsistencies");
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repository);
logger.info("--> perform cleanup by removing snapshots");
assertTrue(client().admin()
.cluster()
Expand Down