Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted round-robin scheduling policy for shard coordination traffic… #4241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
162cbeb
Weighted round-robin scheduling policy for shard coordination traffic…
Aug 17, 2022
8513c4f
Add caching layer for wrr shard routing and moved wrr routing call to…
Aug 26, 2022
4cdbee3
Integrate ARS with weighted round robin,
Aug 27, 2022
7a49e5c
Remove ARS and add tests for zone with undefined weight
Sep 1, 2022
03c4d23
Add changelog for the commit
Sep 1, 2022
7c64537
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 1, 2022
154a3b9
Fix java doc, add test of WeightedRoundRobinRouting metadata
Sep 1, 2022
3afb4e3
Fix minor change related to shuffling wrr shard routings
Sep 1, 2022
30e17bc
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 2, 2022
3aa6fbb
Add default weight 1 for zones with undefined weight
Sep 7, 2022
9ae063c
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 7, 2022
1112366
Inject WRRShardsCache on node start
Sep 12, 2022
cd3e16c
Remove extra new lines
Sep 12, 2022
0c7dbd9
Fix import
Sep 12, 2022
438fe9e
Add size for shard routing cache
Sep 12, 2022
694be4d
Invalidate shard routing cache on close
Sep 12, 2022
9236b8d
Refactor code
Sep 13, 2022
93e7587
Add test for Weighted routing iterator and some refactoring changes
Sep 13, 2022
a9e30d1
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 13, 2022
17dc58c
Update metadata minimal supported version
Sep 13, 2022
ed0cc1b
Add cluster setting for default weight
Sep 14, 2022
5a5cb11
Fix tests due to the change
Sep 14, 2022
1c33964
Fix cache concurrency issue
Sep 14, 2022
f8638f2
Spotless check fix
Sep 14, 2022
4016d27
Fix weighted round robin logic case when there is an entity with weig…
Sep 15, 2022
c98606d
Changes weight data type to double
Sep 15, 2022
1bd83f3
Fix test
Sep 15, 2022
8a72dc0
Empty commit
Sep 15, 2022
bff61b9
Empty commit
Sep 15, 2022
9102616
Fix spotless check
Sep 15, 2022
a291634
Create in-memory cache for shard routings
Sep 16, 2022
bd6c9e7
Fix put operation for weighted shard routings
Sep 16, 2022
d9368ab
Add tests for shard routing in-memory store
Sep 17, 2022
a3f0290
Merge branch 'main' into feature/wrr-shard-routing-core
Sep 17, 2022
01239bf
Add java docs and some code refactoring
Sep 17, 2022
dbbb263
Add null check for discovery nodes and single mutex for weighted shar…
Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Weighted round-robin scheduling policy for shard coordination traffic ([#4241](https://github.com/opensearch-project/OpenSearch/pull/4241))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -191,6 +192,7 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -274,6 +276,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DataStreamMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(WeightedRoutingMetadata.TYPE),
WeightedRoutingMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,14 @@ public IndexGraveyard indexGraveyard() {
return custom(IndexGraveyard.TYPE);
}

/**
* *
* @return The weighted routing metadata for search requests
*/
public WeightedRoutingMetadata weightedRoutingMetadata() {
return custom(WeightedRoutingMetadata.TYPE);
}

public <T extends Custom> T custom(String type) {
return (T) customs.get(type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractNamedDiffable;
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;

/**
* Contains metadata for weighted routing
*
* @opensearch.internal
*/
public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
private static final Logger logger = LogManager.getLogger(WeightedRoutingMetadata.class);
public static final String TYPE = "weighted_shard_routing";
public static final String AWARENESS = "awareness";
private WeightedRouting weightedRouting;

public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

public WeightedRoutingMetadata setWeightedRouting(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
return this;
}

public WeightedRoutingMetadata(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
}
}

public WeightedRoutingMetadata(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.API_AND_GATEWAY;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_2_4_0;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
}

public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.Custom.class, TYPE, in);
}

public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Double attrValue;
String attributeName = null;
Map<String, Double> weights = new HashMap<>();
WeightedRouting weightedRouting = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
String awarenessField = null;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
awarenessField = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata [{}], expected " + "object",
awarenessField
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
attributeName = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata [{}], expected" + " object",
attributeName
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
attrValue = Double.parseDouble(parser.text());
weights.put(attrKey, attrValue);
} else {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata attribute " + "[{}], unknown type",
attributeName
);
}
}
}
}
}
weightedRouting = new WeightedRouting(attributeName, weights);
return new WeightedRoutingMetadata(weightedRouting);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingMetadata that = (WeightedRoutingMetadata) o;
return weightedRouting.equals(that.weightedRouting);
}

@Override
public int hashCode() {
return weightedRouting.hashCode();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(weightedRouting, builder);
return builder;
}

public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException {
builder.startObject(AWARENESS);
builder.startObject(weightedRouting.attributeName());
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = emptyMap();
private volatile Map<AttributesKey, AttributesRoutings> initializingShardsByAttributes = emptyMap();
private final Object shardsByAttributeMutex = new Object();
private final Object shardsByWeightMutex = new Object();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap();

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
Expand Down Expand Up @@ -233,6 +236,10 @@ public List<ShardRouting> assignedShards() {
return this.assignedShards;
}

public Map<WeightedRoutingKey, List<ShardRouting>> getActiveShardsByWeight() {
return activeShardsByWeight;
}

public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(shards));
}
Expand Down Expand Up @@ -292,6 +299,73 @@ public ShardIterator activeInitializingShardsRankedIt(
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted
* round-robin scheduling policy.
*
* @param weightedRouting entity
* @param nodes discovered nodes in the cluster
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
final int seed = shuffler.nextSeed();
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns a list containing shard routings ordered using weighted round-robin scheduling.
*/
private List<ShardRouting> shardsOrderedByWeight(
List<ShardRouting> shards,
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight
) {
WeightedRoundRobin<ShardRouting> weightedRoundRobin = new WeightedRoundRobin<>(
calculateShardWeight(shards, weightedRouting, nodes, defaultWeight)
);
List<WeightedRoundRobin.Entity<ShardRouting>> shardsOrderedbyWeight = weightedRoundRobin.orderEntities();
List<ShardRouting> orderedShardRouting = new ArrayList<>(activeShards.size());
if (shardsOrderedbyWeight != null) {
for (WeightedRoundRobin.Entity<ShardRouting> shardRouting : shardsOrderedbyWeight) {
orderedShardRouting.add(shardRouting.getTarget());
}
}
return orderedShardRouting;
}

/**
* Returns a list containing shard routing and associated weight. This function iterates through all the shards and
* uses weighted routing to find weight for the corresponding shard. This is fed to weighted round-robin scheduling
* to order shards by weight.
*/
private List<WeightedRoundRobin.Entity<ShardRouting>> calculateShardWeight(
List<ShardRouting> shards,
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight
) {
List<WeightedRoundRobin.Entity<ShardRouting>> shardsWithWeights = new ArrayList<>();
for (ShardRouting shard : shards) {
DiscoveryNode node = nodes.get(shard.currentNodeId());
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
if (node != null) {
String attVal = node.getAttributes().get(weightedRouting.attributeName());
// If weight for a zone is not defined, considering it as 1 by default
Double weight = weightedRouting.weights().getOrDefault(attVal, defaultWeight);
shardsWithWeights.add(new WeightedRoundRobin.Entity<>(weight, shard));
}
}
return shardsWithWeights;
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
Expand Down Expand Up @@ -698,6 +772,66 @@ public int shardsMatchingPredicateCount(Predicate<ShardRouting> predicate) {
return count;
}

/**
* Key for WeightedRouting Shard Iterator
*
* @opensearch.internal
*/
public static class WeightedRoutingKey {
private final WeightedRouting weightedRouting;

public WeightedRoutingKey(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingKey key = (WeightedRoutingKey) o;
if (!weightedRouting.equals(key.weightedRouting)) return false;
return true;
}

@Override
public int hashCode() {
int result = weightedRouting.hashCode();
return result;
}
}

/**
* *
* Gets active shard routing from memory if available, else calculates and put it in memory.
*/
private List<ShardRouting> getActiveShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting);
List<ShardRouting> shardRoutings = activeShardsByWeight.get(key);
if (shardRoutings == null) {
synchronized (shardsByWeightMutex) {
shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight);
activeShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}

/**
* *
* Gets initializing shard routing from memory if available, else calculates and put it in memory.
*/
private List<ShardRouting> getInitializingShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting);
List<ShardRouting> shardRoutings = initializingShardsByWeight.get(key);
if (shardRoutings == null) {
synchronized (shardsByWeightMutex) {
shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight);
initializingShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}

/**
* Builder of an index shard routing table.
*
Expand Down
Loading