Skip to content

Commit

Permalink
Fix flaky test NRTReplicationEngineTests.testPreserveLatestCommit (op…
Browse files Browse the repository at this point in the history
…ensearch-project#9378)

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
mch2 authored and kaushalmahi12 committed Sep 12, 2023
1 parent f351d9e commit e268726
Showing 1 changed file with 26 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,8 @@ public void testRemoveExtraSegmentsOnStartup() throws Exception {
for (String file : extraSegments) {
assertTrue(replicaFiles.contains(file));
}
assertTrue(storeContainsAll(nrtEngineStore, extraSegments));
try (NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)) {
replicaFiles = List.of(nrtEngineStore.directory().listAll());
for (String file : extraSegments) {
assertFalse(replicaFiles.contains(file));
}
assertUnreferenced(nrtEngine, extraSegments);
}
}
}
Expand All @@ -452,21 +448,12 @@ public void testPreserveLatestCommit() throws Exception {

final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
final Collection<String> lastCommittedFiles = lastCommittedSegmentInfos.files(true);
assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles));

// ensure segments and commit file are incref'd:
assertEquals(
"Segments_N is incref'd once",
1,
nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName())
);
// segments are incref'd twice because they are loaded on the reader.
assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2);
assertRefCounted(nrtEngine, lastCommittedFiles);

// get and close a snapshot - this will decref files when closed.
final GatedCloseable<SegmentInfos> segmentInfosSnapshot = nrtEngine.getSegmentInfosSnapshot();
segmentInfosSnapshot.close();
assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles));
assertRefCounted(nrtEngine, lastCommittedFiles);

// index more docs and refresh the reader - this will incref/decref files again
indexOperations(nrtEngine, operations.subList(2, 4));
Expand All @@ -477,44 +464,44 @@ public void testPreserveLatestCommit() throws Exception {
// get the additional segments that are only on the reader - not part of a commit.
final Collection<String> readerOnlySegments = primaryInfos.files(false);
readerOnlySegments.removeAll(lastCommittedFiles);
assertRefCount(nrtEngine, readerOnlySegments, 1);

assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles));
assertEquals(
"Segments_N is incref'd once",
1,
nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName())
);
assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2);
assertRefCounted(nrtEngine, readerOnlySegments);
// re-read the last commit from disk here in case the primary engine has flushed.
assertRefCounted(nrtEngine, nrtEngine.getLastCommittedSegmentInfos().files(true));

// flush the primary
engine.flush(true, true);
copySegments(engine.getLatestSegmentInfos().files(false), nrtEngine);
final Collection<String> latestPrimaryInfos = engine.getLatestSegmentInfos().files(false);
final Collection<String> mergedAwayFiles = nrtEngine.getLastCommittedSegmentInfos().files(false);
// remove files still part of latest commit.
mergedAwayFiles.removeAll(latestPrimaryInfos);
copySegments(latestPrimaryInfos, nrtEngine);
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
// after flush our segment_n is removed.
assertEquals(
"Segments_N is removed",
0,
nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName())
);
assertFalse(storeContainsAll(nrtEngineStore, lastCommittedFiles));
// after flush our original segment_n is removed but some segments may remain.
assertUnreferenced(nrtEngine, List.of(lastCommittedSegmentInfos.getSegmentsFileName()));
assertUnreferenced(nrtEngine, mergedAwayFiles);
// close the engine - ensure we preserved the last commit
final SegmentInfos infosBeforeClose = nrtEngine.getLatestSegmentInfos();
nrtEngine.close();
assertTrue(storeContainsAll(nrtEngineStore, infosBeforeClose.files(false)));
assertEquals(store.readLastCommittedSegmentsInfo().files(false), infosBeforeClose.files(false));
assertRefCounted(nrtEngine, infosBeforeClose.files(false));
}
}

private void assertRefCount(NRTReplicationEngine nrtEngine, Collection<String> files, int count) {
private void assertRefCounted(NRTReplicationEngine nrtEngine, Collection<String> files) throws IOException {
List<String> storeFiles = List.of(nrtEngine.store.directory().listAll());
for (String file : files) {
// refCount for our segments is 2 because they are still active on the reader
assertEquals(count, nrtEngine.replicaFileTracker.refCount(file));
assertTrue("Expected: " + file + " to be referenced", nrtEngine.replicaFileTracker.refCount(file) >= 1);
assertTrue(storeFiles.contains(file));
}
}

private boolean storeContainsAll(Store nrtEngineStore, Collection<String> lastCommittedFiles) throws IOException {
return List.of(nrtEngineStore.directory().listAll()).containsAll(lastCommittedFiles);
private void assertUnreferenced(NRTReplicationEngine nrtEngine, Collection<String> files) throws IOException {
List<String> storeFiles = List.of(nrtEngine.store.directory().listAll());
for (String file : files) {
// refCount for our segments is 2 because they are still active on the reader
assertEquals("Expected: " + file + " to be unreferenced", 0, nrtEngine.replicaFileTracker.refCount(file));
assertFalse(storeFiles.contains(file));
}
}

private void cleanAndCopySegmentsFromPrimary(NRTReplicationEngine nrtEngine) throws IOException {
Expand Down

0 comments on commit e268726

Please sign in to comment.