Skip to content

Commit

Permalink
[Segment Replication] Update RefreshPolicy.WAIT_UNTIL for replica sha…
Browse files Browse the repository at this point in the history
…rds with segment replication enabled to wait for replica refresh (#6464)

* Initial draft PR for wait_until with segrep

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor code and fix test failures.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* add comments and fix tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor code, address comments and fix test failures.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Aplly spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adress comments and add integ test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments and fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fixing failing test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Remove unused code.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Addressing comments and refactoring

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding max refreshlisteners limit that a replica shard can hold and force refresh.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Changing assert message

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix call to release refresh listeners on replica shards.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix call to release refresh listeners on replica shards.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fixing compile errors.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Spoltss Apply

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
(cherry picked from commit e8a4210)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 10, 2023
1 parent fb0d7d7 commit d48861c
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -20,15 +23,18 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -578,4 +584,99 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testWaitUntilRefresh() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(6000, 7000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
assertBusy(() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
assertEquals(primaryShard.getProcessedLocalCheckpoint(), replicaShard.getProcessedLocalCheckpoint());
}, 1, TimeUnit.MINUTES);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}

public void testWaitUntilWhenReplicaPromoted() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final CountDownLatch waitForReplication = new CountDownLatch(1);
// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(700, 5000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNode);
assertNotNull(replicaShardRouting);
waitForReplication.countDown();
assertBusy(() -> {
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
int successfulDocCount = 0;
for (ActionFuture<IndexResponse> response : pendingIndexResponses) {
try {
IndexResponse indexResponse = response.actionGet();
successfulDocCount++;
} catch (Exception e) {
logger.trace("Failed to index Doc", e);
}
}
assertTrue(
client(replicaNode).prepareSearch(INDEX_NAME)
.setPreference("_only_local")
.setSize(0)
.get()
.getHits()
.getTotalHits().value >= successfulDocCount
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse(
@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
final Tuple<Translog.Location, Long> tuple = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, tuple.v1(), tuple.v2(), null, replica, logger);
});
}

Expand All @@ -790,8 +790,10 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final boolean isSegRepEnabled = replica.indexSettings().isSegRepEnabled();
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
Expand All @@ -813,17 +815,23 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
primaryTerm,
response.getFailure().getMessage()
);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getFailure().getSeqNo(), maxSeqNo);
}
} else {
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getResponse().getSeqNo(), maxSeqNo);
}
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
return new Tuple<>(location, maxSeqNo);
}

private static Engine.Result performOpOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnReplica(
) {
ActionListener.completeWith(listener, () -> {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
return new WriteReplicaResult<>(request, location, null, null, replica, logger);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
* We call this after replication because this might wait for a refresh and that can take a while.
* This way we wait for the refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(primary, replicaRequest, location, null, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
Expand All @@ -329,20 +329,23 @@ public void onFailure(Exception ex) {
* @opensearch.internal
*/
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>> extends ReplicaResult {
public final Location location;
private final Location location;
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
private final Long maxSeqNo;

public WriteReplicaResult(
ReplicaRequest request,
@Nullable Location location,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
@Nullable Exception operationFailure,
IndexShard replica,
Logger logger
) {
super(operationFailure);
this.location = location;
this.maxSeqNo = maxSeqNo;
this.request = request;
this.replica = replica;
this.logger = logger;
Expand All @@ -353,7 +356,7 @@ public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
Expand Down Expand Up @@ -403,7 +406,6 @@ interface RespondingWriteResult {
* @opensearch.internal
*/
static final class AsyncAfterWriteAction {
private final Location location;
private final boolean waitUntilRefresh;
private final boolean sync;
private final AtomicInteger pendingOps = new AtomicInteger(1);
Expand All @@ -414,10 +416,15 @@ static final class AsyncAfterWriteAction {
private final WriteRequest<?> request;
private final Logger logger;

private final Location location;

private final Long maxSeqNo;

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
final RespondingWriteResult respond,
final Logger logger
) {
Expand All @@ -443,6 +450,7 @@ static final class AsyncAfterWriteAction {
this.waitUntilRefresh = waitUntilRefresh;
this.respond = respond;
this.location = location;
this.maxSeqNo = maxSeqNo;
if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) {
pendingOps.incrementAndGet();
}
Expand Down Expand Up @@ -474,7 +482,7 @@ void run() {
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
indexShard.addRefreshListener(location, maxSeqNo, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,22 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
maybeRefresh(source);
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
return false;
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
// checks if an actual refresh (change in segments) happened
if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) {
return null;
}
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
return new WriteReplicaResult<>(request, null, null, null, replica, getLogger());
});
}

Expand Down
18 changes: 12 additions & 6 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,8 @@ private RefreshListeners buildRefreshListeners() {
() -> refresh("too_many_listeners"),
logger,
threadPool.getThreadContext(),
externalRefreshMetric
externalRefreshMetric,
this::getProcessedLocalCheckpoint
);
}

Expand Down Expand Up @@ -4145,7 +4146,7 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
markSearcherAccessed(); // move the shard into non-search idle
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, (b) -> {
addRefreshListener(location, null, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
});
Expand All @@ -4155,13 +4156,14 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
}

/**
* Add a listener for refreshes.
* Add a listener for refreshes. Only on Segment replication enabled replica shards we listen for maxSeqNo. In all other cases we listen for translog location
*
* @param location the location to listen for
* @param location the translog location to listen for on a refresh
* @param maxSeqNo the Sequence Number to listen for on a refresh
* @param listener for the refresh. Called with true if registering the listener ran it out of slots and forced a refresh. Called with
* false otherwise.
*/
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consumer<Boolean> listener) {
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
Expand All @@ -4174,7 +4176,11 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
}
}
if (readAllowed) {
refreshListeners.addOrNotify(location, listener);
if (indexSettings.isSegRepEnabled() && shardRouting.primary() == false) {
refreshListeners.addOrNotify(maxSeqNo, listener);
} else {
refreshListeners.addOrNotify(location, listener);
}
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
Expand Down
Loading

0 comments on commit d48861c

Please sign in to comment.