Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
mingshl committed Jan 7, 2023
2 parents c61d387 + 85f4149 commit 52626cf
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void ingestDocs(int docCount) throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -521,7 +522,6 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down Expand Up @@ -623,6 +623,7 @@ public void testDeleteOperations() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void testPrimaryRelocation() throws Exception {
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the
* replicas.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final String oldPrimary = internalCluster().startNode(featureFlagSettings());
createIndex();
Expand Down
50 changes: 41 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this target shard should start a round of segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
if (indexSettings.isSegRepEnabled() == false) {
logger.warn("Attempting to perform segment replication when it is not enabled on the index");
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
logger.warn("Shard is in primary mode and cannot perform segment replication as a replica.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
logger.warn("Shard is marked as primary and cannot perform segment replication as a replica");
return false;
}
if (state().equals(IndexShardState.STARTED) == false
&& (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(
() -> new ParameterizedMessage(
"Shard is not started or recovering {} {} and cannot perform segment replication as a replica",
state(),
shardRouting.state()
)
);
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Shard does not have the correct engine type to perform segment replication {}.",
getEngine().getClass()
)
);
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -811,11 +810,7 @@ private void forceSegmentReplication(
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public void setUp() throws Exception {
Metadata metadata = Metadata.builder().build();
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.nodes(clusterService.state().nodes())
.build();
setState(clusterService, clusterState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
closeShards(indexShard);
}

public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {
Expand Down

0 comments on commit 52626cf

Please sign in to comment.