Skip to content

Commit

Permalink
Fail open changes
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Nov 9, 2022
1 parent 01700df commit 02f1fa8
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -158,4 +162,95 @@ public void testSearchWithWRRShardRouting() throws IOException {
}
}

public void testFailOpenOnSearch() throws IOException {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get();
}
refresh("test");

ClusterState state1 = internalCluster().clusterService().state();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_b.get(0)));
ensureStableCluster(2);

Set<String> hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().smartClient()
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

ImmutableOpenMap<String, DiscoveryNode> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
for (Iterator<DiscoveryNode> it = dataNodes.valuesIt(); it.hasNext();) {
DiscoveryNode node = it.next();
if (node.getAttributes().get("zone").equals("c")) {
nodeIdsFromZoneWithWeightZero.add(node.getId());
}
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().isDataNode()) {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -261,24 +262,31 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
tryNext(e, false);
}

private ShardRouting nextRoutingOrNull() {
private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();

if (next != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(next, clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(failure)) {
next = shardsIt.get(shardIndex).nextOrNull();
}

if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
return nextRoutingOrNull(failure);
}

private void moveToNextShard() {
++shardIndex;
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
ShardRouting shardRouting = nextRoutingOrNull(lastFailure);
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,27 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.transport.NodeNotConnectedException;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand All @@ -70,6 +76,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
Expand Down Expand Up @@ -445,11 +452,48 @@ ShardSearchFailure[] buildShardFailures() {
return failures;
}

private boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException
|| e instanceof NodeClosedException) {
return true;
}
return false;
}

private boolean shardInWeighedAwayAZ(SearchShardTarget nextShard) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.getNodeId());
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
}

}
return false;

}

private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
SearchShardTarget nextShard = shardIt.nextOrNull();

if (nextShard != null && shardInWeighedAwayAZ(nextShard) && !isInternalFailure(e)) {
nextShard = shardIt.nextOrNull();
}
final boolean lastShard = nextShard == null;
logger.debug(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -251,6 +252,12 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
// will work (it will just override it...)
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();

if (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard, clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
nextShard = shardIt.nextOrNull();
}
if (nextShard != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -244,7 +245,12 @@ private void perform(@Nullable final Exception currentFailure) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting, clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(currentFailure)) {
shardRouting = shardIt.nextOrNull();
}
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
Expand Down Expand Up @@ -273,6 +279,7 @@ private void perform(@Nullable final Exception currentFailure) {
);
}
final Writeable.Reader<Response> reader = getResponseReader();
ShardRouting finalShardRouting = shardRouting;
transportService.sendRequest(
node,
transportShardAction,
Expand All @@ -296,7 +303,7 @@ public void handleResponse(final Response response) {

@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
onFailure(finalShardRouting, exp);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -317,6 +318,20 @@ public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weighted
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}

// append shards for attribute value with weight zero to the end, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null) {
ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt("zone:" + keys.findFirst().get(), nodes);
while (iterator.remaining() > 0) {
ordered.add(iterator.nextOrNull());
}
}
return new PlainShardIterator(shardId, ordered);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.transport.NodeNotConnectedException;

import java.util.Map;
import java.util.stream.Stream;

public class WeightedRoutingHelper {

public static boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException) {
return true;
}
return false;
}

public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState clusterState) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.currentNodeId());
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();

if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
}
}

return false;
}

}

0 comments on commit 02f1fa8

Please sign in to comment.