Skip to content

Commit

Permalink
Fix flaky SegRep test testScrollCreatedOnReplica (opensearch-project#…
Browse files Browse the repository at this point in the history
…12077)

* Fix flaky test testScrollCreatedOnReplica

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* Disable scheduled refresh

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* Clean up segment collection assertions

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* Fix spotless

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

---------

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 authored Feb 21, 2024
1 parent bf050d4 commit 247e2ee
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication;

import org.apache.lucene.index.SegmentInfos;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -16,6 +17,8 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
Expand All @@ -27,12 +30,14 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -230,4 +235,14 @@ protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Ex
}
}, 30, TimeUnit.SECONDS);
}

/**
* Returns the latest SIS for a shard but does not incref the segments.
*/
protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOException {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = shard.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> closeable = tuple.v1()) {
return closeable.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -73,7 +72,6 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchService;
Expand All @@ -92,6 +90,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1053,32 +1053,31 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
public void testScrollCreatedOnReplica() throws Exception {
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
prepareCreate(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(0))
.setSource(jsonBuilder().startObject().field("field", 0).endObject())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);

assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint();
final Collection<String> snapshottedSegments;
try (final GatedCloseable<SegmentInfos> closeable = tuple.v1()) {
snapshottedSegments = closeable.get().files(false);
}

// opens a scrolled query before a flush is called.
// this is for testing scroll segment consistency between refresh and flush
SearchResponse searchResponse = client(replica).prepareSearch()
Expand All @@ -1092,31 +1091,40 @@ public void testScrollCreatedOnReplica() throws Exception {
.setScroll(TimeValue.timeValueDays(1))
.get();

// force call flush
flush(INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
SegmentInfos latestSegmentInfos = getLatestSegmentInfos(replicaShard);
final Set<String> snapshottedSegments = new HashSet<>(latestSegmentInfos.files(false));
logger.info("Segments {}", snapshottedSegments);

for (int i = 3; i < 5; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// index more docs and force merge down to 1 segment
for (int i = 1; i < 5; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
// create new on-disk segments and copy them out.
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

// force merge and flush.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
// wait for replication to complete
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
logger.info("Local segments after force merge and commit {}", getLatestSegmentInfos(replicaShard).files(false));
List<String> filesBeforeClearScroll = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", filesBeforeClearScroll.containsAll(snapshottedSegments));

// Test stats
logger.info("--> Collect all scroll query hits");
long scrollHits = 0;
Expand All @@ -1125,20 +1133,23 @@ public void testScrollCreatedOnReplica() throws Exception {
searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get();
assertAllSuccessful(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
assertEquals(1, scrollHits);

client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

assertBusy(
() -> assertFalse(
"Files should be cleaned up post scroll clear request",
List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
)
final Set<String> filesAfterClearScroll = Arrays.stream(replicaShard.store().directory().listAll()).collect(Collectors.toSet());
// there should be no active readers, snapshots, or on-disk commits containing the snapshotted files, check that they have been
// deleted.
Set<String> latestCommitSegments = new HashSet<>(replicaShard.store().readLastCommittedSegmentsInfo().files(false));
assertEquals(
"Snapshotted files are no longer part of the latest commit",
Collections.emptySet(),
Sets.intersection(latestCommitSegments, snapshottedSegments)
);
assertEquals(
"All snapshotted files should be deleted",
Collections.emptySet(),
Sets.intersection(filesAfterClearScroll, snapshottedSegments)
);
assertEquals(10, scrollHits);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of
Expand All @@ -31,10 +31,10 @@ final class ReplicaFileTracker {

public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class);
private final Map<String, Integer> refCounts = new HashMap<>();
private final BiConsumer<String, String> fileDeleter;
private final Consumer<String> fileDeleter;
private final Set<String> EXCLUDE_FILES = Set.of("write.lock");

public ReplicaFileTracker(BiConsumer<String, String> fileDeleter) {
public ReplicaFileTracker(Consumer<String> fileDeleter) {
this.fileDeleter = fileDeleter;
}

Expand Down Expand Up @@ -82,7 +82,7 @@ private synchronized void delete(Collection<String> toDelete) {

private synchronized void delete(String fileName) {
assert canDelete(fileName);
fileDeleter.accept("delete unreferenced", fileName);
fileDeleter.accept(fileName);
}

private synchronized boolean canDelete(String fileName) {
Expand Down

0 comments on commit 247e2ee

Please sign in to comment.