Skip to content

Commit

Permalink
[Weighted Shard Routing] Fail open requests on search shard failures (o…
Browse files Browse the repository at this point in the history
…pensearch-project#5072)

* Fail open requests on search shard failures (

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
anshu1106 authored and Anshu Agarwal committed Jan 10, 2023
1 parent b1ca51e commit 9807620
Show file tree
Hide file tree
Showing 15 changed files with 1,242 additions and 52 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update to Gradle 7.6 ([#5382](https://github.com/opensearch-project/OpenSearch/pull/5382))
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))
- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229))
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))
- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -262,24 +263,25 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();
ShardRouting next = FailAwareWeightedRouting.getInstance().findNext(shardsIt.get(shardIndex), clusterService.state(), failure);

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e);

final boolean lastShard = nextShard == null;
logger.debug(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String getClusterAlias() {
return clusterAlias;
}

SearchShardTarget nextOrNull() {
public SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();
ShardRouting nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), e);

if (nextShard != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.FailAwareWeightedRouting;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
ShardRouting shardRouting = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), currentFailure);

if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
Expand Down Expand Up @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) {
);
}
final Writeable.Reader<Response> reader = getResponseReader();
ShardRouting finalShardRouting = shardRouting;
transportService.sendRequest(
node,
transportShardAction,
Expand All @@ -296,7 +299,7 @@ public void handleResponse(final Response response) {

@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
onFailure(finalShardRouting, exp);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Cust
public static final String VERSION = "_version";
public static final long INITIAL_VERSION = -1;
public static final long VERSION_UNSET_VALUE = -2;
public static final int WEIGHED_AWAY_WEIGHT = 0;

public long getVersion() {
return version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardIterator;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.index.shard.ShardId;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchShardTarget;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
* nodes whose attribute value weight for weighted shard routing is set to zero.
*/

public enum FailAwareWeightedRouting {
INSTANCE;

private static final Logger logger = LogManager.getLogger(FailAwareWeightedRouting.class);

private final static List<RestStatus> internalErrorRestStatusList = List.of(
RestStatus.INTERNAL_SERVER_ERROR,
RestStatus.BAD_GATEWAY,
RestStatus.SERVICE_UNAVAILABLE,
RestStatus.GATEWAY_TIMEOUT
);

public static FailAwareWeightedRouting getInstance() {
return INSTANCE;

}

/**
* *
* @return true if exception is due to cluster availability issues
*/
private boolean isInternalFailure(Exception exception) {
if (exception instanceof OpenSearchException) {
// checking for 5xx failures
return internalErrorRestStatusList.contains(((OpenSearchException) exception).status());
}
return false;
}

/**
* This function checks if the shard is present in data node with weighted routing weight set to 0,
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non
* retryable exception.
*
* @param nodeId the node with the shard copy
* @return true if the node has attribute value with shard routing weight set to zero, else false
*/
private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null && weightedRouting.isSet()) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);

for (Object key : keys.toArray()) {
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) {
return true;
}
}
}
}
return false;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) {
SearchShardTarget next = shardIt.nextOrNull();
while (next != null && isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
logger.info(
() -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.getShardId(), exception)
);
break;
}
next = shardIt.nextOrNull();
}
return next;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @return the next shard copy
*/
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) {
ShardRouting next = shardsIt.nextOrNull();

while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.shardId(), exception));
break;
}
next = shardsIt.nextOrNull();
}
return next;
}

/**
* *
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
}

private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
List<ShardRouting> shards = clusterState.routingTable().shardRoutingTable(shardId).shards();
for (ShardRouting shardRouting : shards) {
if (!shardRouting.active()) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

package org.opensearch.cluster.routing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand All @@ -57,6 +60,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap();

private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class);

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
* If we can come up with a better variable name, it would be nice...
Expand Down Expand Up @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt(
*
* @param weightedRouting entity
* @param nodes discovered nodes in the cluster
* @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present
* in node attribute value with weight zero
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
) {
final int seed = shuffler.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
if (isFailOpenEnabled) {
try {
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);
keys.forEach(key -> {
ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes);
while (iterator.remaining() > 0) {
ordered.add(iterator.nextOrNull());
}
});
} catch (IllegalArgumentException e) {
// this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard
// copies found is zero
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}
orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList());
return new PlainShardIterator(shardId, orderedListWithDistinctShards);
}

/**
Expand Down
Loading

0 comments on commit 9807620

Please sign in to comment.