Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Shard Routing] API versioning #5255

Merged
merged 28 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert version reset on delete
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Jan 4, 2023
commit 04b7db1063155aee369dc0121e59193d574ea833
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -362,7 +361,6 @@ public void testPutAndDeleteWithVersioning() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("weighted_routing.gc_deletes", 10, TimeUnit.SECONDS)
.build();

logger.info("--> starting 6 nodes on different zones");
Expand Down Expand Up @@ -438,16 +436,17 @@ public void testPutAndDeleteWithVersioning() throws Exception {
.get();
assertTrue(deleteResponse.isAcknowledged());

// sleeping for 10 sec to ensure that weighted routing weights is gc deleted ie version is reset to -1
Thread.sleep(10000);

// update weights again and make sure that version number got updated on delete
weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(-1).get();
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get();
assertTrue(deleteResponse.isAcknowledged());

// delete weights call, incorrect version number
UnsupportedWeightedRoutingStateException deleteException = expectThrows(
UnsupportedWeightedRoutingStateException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@
import org.opensearch.common.inject.Inject;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
Expand All @@ -46,7 +43,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.ValidateActions.addValidationError;
Expand All @@ -62,22 +58,6 @@ public class WeightedRoutingService {
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;
private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0;
volatile Scheduler.Cancellable cancellable;

/**
* setting to enable / disable weighted routing deletes garbage collection.
* This setting is realtime updatable
*/
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final Setting<TimeValue> WEIGHTS_GC_DELETES_SETTING = Setting.timeSetting(
"weighted_routing.gc_deletes",
DEFAULT_GC_DELETES,
new TimeValue(-1, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();

@Inject
public WeightedRoutingService(
Expand All @@ -99,16 +79,6 @@ public WeightedRoutingService(
this::setForcedAwarenessAttributes
);

this.gcDeletesInMillis = WEIGHTS_GC_DELETES_SETTING.get(settings).getMillis();
clusterSettings.addSettingsUpdateConsumer(WEIGHTS_GC_DELETES_SETTING, this::setGCDeletes);
}

private void setGCDeletes(TimeValue timeValue) {
this.gcDeletesInMillis = timeValue.getMillis();
}

public long getGcDeletesInMillis() {
return gcDeletesInMillis;
}

public void registerWeightedRoutingMetadata(
Expand Down Expand Up @@ -208,7 +178,6 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("cluster weighted routing metadata change is processed by all the nodes");
listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true));
scheduleVersionReset();
}
});
}
Expand Down Expand Up @@ -338,50 +307,4 @@ private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetad
);
}
}

private void scheduleVersionReset() {
cancellable = this.threadPool.schedule(
resetWeightedRoutingVersion(),
TimeValue.timeValueMillis(getGcDeletesInMillis()),
ThreadPool.Names.SAME
);

}

private Runnable resetWeightedRoutingVersion() {
clusterService.submitStateUpdateTask("reset_weighted_routing_version", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
logger.info("Resetting weighted routing weights version in cluster state metadata");
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) {
weightedRoutingMetadata = new WeightedRoutingMetadata(
weightedRoutingMetadata.getWeightedRouting(),
WeightedRoutingMetadata.INITIAL_VERSION
);
}

mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
logger.info("building cluster state after resetting weighted routing version ");
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.error("failed to reset weighted routing weights version in cluster metadata", e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("cluster weighted routing version reset is processed by all the nodes");
if (cancellable != null) {
cancellable.cancel();
}
}
});

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.opensearch.cluster.routing.WeightedRoutingService;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
Expand Down Expand Up @@ -539,7 +538,6 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING,
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
WeightedRoutingService.WEIGHTS_GC_DELETES_SETTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public void testValidate_AttributeMissing() {
}

public void testValidate_MoreThanHalfWithZeroWeight() {
String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," +
"\"_version\":1}";
String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + "\"_version\":1}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON);
ActionRequestValidationException actionRequestValidationException = request.validate();
Expand Down