Skip to content

Commit

Permalink
Initial draft PR for wait_until with segrep
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Feb 23, 2023
1 parent 01466a6 commit 055f225
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -20,15 +22,18 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -515,4 +520,46 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testWaitUntil() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
// we want to control refreshes
.put("index.refresh_interval", "40ms")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(4000, 5000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
assertBusy(
() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
},
1,
TimeUnit.MINUTES
);

assertEquals(primaryShard.getLatestReplicationCheckpoint().getSeqNo(), replicaShard.getLatestReplicationCheckpoint().getSeqNo());

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse(
@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
final Tuple<Translog.Location, Long> tuple = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, tuple, replica, logger);
});
}

Expand All @@ -790,8 +790,9 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
Expand Down Expand Up @@ -822,8 +823,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
maxSeqNo = response.getResponse().getSeqNo();
}
return location;
return new Tuple<Translog.Location, Long>(location, maxSeqNo);
}

private static Engine.Result performOpOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -333,6 +335,7 @@ public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteReq
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
private long maxSeqNo;

public WriteReplicaResult(
ReplicaRequest request,
Expand All @@ -343,17 +346,23 @@ public WriteReplicaResult(
) {
super(operationFailure);
this.location = location;
this.maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
this.request = request;
this.replica = replica;
this.logger = logger;
}

public WriteReplicaResult(ReplicaRequest request, Tuple<Location, Long> tuple, IndexShard replica, Logger logger) {
this(request, tuple.v1(), null, replica, logger);
this.maxSeqNo = tuple.v2();
}

@Override
public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
Expand Down Expand Up @@ -414,6 +423,8 @@ static final class AsyncAfterWriteAction {
private final WriteRequest<?> request;
private final Logger logger;

private long maxSeqNo;

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
Expand Down Expand Up @@ -450,6 +461,18 @@ static final class AsyncAfterWriteAction {
assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOpts was: " + pendingOps.get();
}

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
@Nullable final Translog.Location location,
final long maxSeqNo,
final RespondingWriteResult respond,
final Logger logger
) {
this(indexShard, request, location, respond, logger);
this.maxSeqNo = maxSeqNo;
}

/** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/
private void maybeFinish() {
final int numPending = pendingOps.decrementAndGet();
Expand All @@ -473,14 +496,25 @@ void run() {
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
if (indexShard.indexSettings().isSegRepEnabled() == true) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(maxSeqNo, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
} else {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
}
}
if (sync) {
assert pendingOps.get() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
Expand Down Expand Up @@ -306,7 +307,35 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
try {
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
if (store.tryIncRef()) {
// increment the ref just to ensure nobody closes the store during a refresh
try {
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
ReferenceManager<OpenSearchDirectoryReader> referenceManager = getReferenceManager(SearcherScope.EXTERNAL);
// it is intentional that we never refresh both internal / external together
referenceManager.maybeRefresh();

} finally {
store.decRef();
}
} else {
return;
}
} catch (AlreadyClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4175,6 +4175,26 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
}
}

public void addRefreshListener(long maxSeqNo, Consumer<Boolean> listener) {
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
} else {
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (postRecoveryMutex) {
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
refreshListeners.addOrNotify(maxSeqNo, listener, this);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
}
}

/**
* Metrics updater for a refresh
*
Expand Down
Loading

0 comments on commit 055f225

Please sign in to comment.