Skip to content

Commit 78e5cbb

Browse files
Implement Eventually Consistent Mock Repository for SnapshotResiliencyTests (#40893)
* Add eventually consistent mock repository for reproducing and testing AWS S3 blob store behavior * Relates #38941
1 parent 6d70276 commit 78e5cbb

File tree

4 files changed

+556
-17
lines changed

4 files changed

+556
-17
lines changed

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
129129

130130
private static final int BUFFER_SIZE = 4096;
131131

132-
private static final String SNAPSHOT_PREFIX = "snap-";
132+
public static final String SNAPSHOT_PREFIX = "snap-";
133133

134-
private static final String SNAPSHOT_CODEC = "snapshot";
134+
public static final String SNAPSHOT_CODEC = "snapshot";
135135

136136
private static final String INDEX_FILE_PREFIX = "index-";
137137

138-
private static final String INDEX_LATEST_BLOB = "index.latest";
138+
public static final String INDEX_LATEST_BLOB = "index.latest";
139139

140140
private static final String TESTS_FILE = "tests-";
141141

@@ -180,7 +180,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
180180

181181
private final ChecksumBlobStoreFormat<IndexMetaData> indexMetaDataFormat;
182182

183-
private final ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
183+
protected final ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
184184

185185
private final boolean readOnly;
186186

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.cluster.service.ClusterApplierService;
109109
import org.elasticsearch.cluster.service.ClusterService;
110110
import org.elasticsearch.cluster.service.MasterService;
111+
import org.elasticsearch.common.Nullable;
111112
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
112113
import org.elasticsearch.common.network.NetworkModule;
113114
import org.elasticsearch.common.settings.ClusterSettings;
@@ -152,6 +153,7 @@
152153
import org.elasticsearch.search.SearchService;
153154
import org.elasticsearch.search.builder.SearchSourceBuilder;
154155
import org.elasticsearch.search.fetch.FetchPhase;
156+
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
155157
import org.elasticsearch.test.ESTestCase;
156158
import org.elasticsearch.test.disruption.DisruptableMockTransport;
157159
import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -203,16 +205,28 @@ public class SnapshotResiliencyTests extends ESTestCase {
203205

204206
private Path tempDir;
205207

208+
/**
209+
* Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used.
210+
* {@code null} if not using the eventually consistent blobstore.
211+
*/
212+
@Nullable private MockEventuallyConsistentRepository.Context blobStoreContext;
213+
206214
@Before
207215
public void createServices() {
208216
tempDir = createTempDir();
217+
if (randomBoolean()) {
218+
blobStoreContext = new MockEventuallyConsistentRepository.Context();
219+
}
209220
deterministicTaskQueue =
210221
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
211222
}
212223

213224
@After
214225
public void verifyReposThenStopServices() {
215226
try {
227+
if (blobStoreContext != null) {
228+
blobStoreContext.forceConsistent();
229+
}
216230
BlobStoreTestUtil.assertConsistency(
217231
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
218232
Runnable::run);
@@ -900,19 +914,7 @@ public void onFailure(final Exception e) {
900914
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
901915
repositoriesService = new RepositoriesService(
902916
settings, clusterService, transportService,
903-
Collections.singletonMap(FsRepository.TYPE, metaData -> {
904-
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
905-
@Override
906-
protected void assertSnapshotOrGenericThread() {
907-
// eliminate thread name check as we create repo in the test thread
908-
}
909-
};
910-
repository.start();
911-
return repository;
912-
}
913-
),
914-
emptyMap(),
915-
threadPool
917+
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
916918
);
917919
snapshotsService =
918920
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
@@ -1077,6 +1079,28 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon
10771079
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
10781080
}
10791081

1082+
private Repository.Factory getRepoFactory(Environment environment) {
1083+
// Run half the tests with the eventually consistent repository
1084+
if (blobStoreContext == null) {
1085+
return metaData -> {
1086+
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
1087+
@Override
1088+
protected void assertSnapshotOrGenericThread() {
1089+
// eliminate thread name check as we create repo in the test thread
1090+
}
1091+
};
1092+
repository.start();
1093+
return repository;
1094+
};
1095+
} else {
1096+
return metaData -> {
1097+
final Repository repository = new MockEventuallyConsistentRepository(
1098+
metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
1099+
repository.start();
1100+
return repository;
1101+
};
1102+
}
1103+
}
10801104
public void restart() {
10811105
testClusterNodes.disconnectNode(this);
10821106
final ClusterState oldState = this.clusterService.state();

0 commit comments

Comments
 (0)