Skip to content

Commit

Permalink
Fix spotless java
Browse files Browse the repository at this point in the history
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
pranikum committed Sep 25, 2022
1 parent 5274f07 commit 79f570c
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ private ClusterPutWRRWeightsAction() {
super(NAME, ClusterPutWRRWeightsResponse::new);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public void setWRRWeight(Map<String, String> source) {

public void setWRRWeight(BytesReference source, XContentType contentType) {
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
) {
String attrValue = null;
Map<String, Object> weights = new HashMap<>();
Expand Down Expand Up @@ -161,4 +161,4 @@ public void writeTo(StreamOutput out) throws IOException {
public String toString() {
return "ClusterPutWRRWeightsRequest{" + "wrrWeight= " + wrrWeight.toString() + "}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ public ClusterPutWRRWeightsResponse(boolean acknowledged) {
public ClusterPutWRRWeightsResponse(StreamInput in) throws IOException {
super(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsAction;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsRequest;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -32,14 +39,17 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -267,4 +277,159 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

public void handleNodesDecommissionRequest(
Set<DiscoveryNode> nodesToBeDecommissioned,
List<String> zones,
String reason,
TimeValue timeout,
TimeValue timeoutForNodeDecommission,
ActionListener<Void> nodesRemovedListener
) {
setWeightForDecommissionedZone(zones);
checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, timeoutForNodeDecommission, nodesRemovedListener);
}

private void setWeightForDecommissionedZone(List<String> zones) {
ClusterState clusterState = clusterService.getClusterApplierService().state();

DecommissionAttributeMetadata decommissionAttributeMetadata = clusterState.metadata().custom(DecommissionAttributeMetadata.TYPE);
assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT)
: "unexpected status encountered while decommissioning nodes";
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();

Map<String, String> weights = new HashMap<>();
zones.forEach(zone -> {
if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) {
weights.put(zone, "0");
} else {
weights.put(zone, "1");
}
});

// WRR API will validate invalid weights
final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest();
clusterWeightRequest.attributeName("zone");
clusterWeightRequest.setWRRWeight(weights);

transportService.sendRequest(
transportService.getLocalNode(),
ClusterPutWRRWeightsAction.NAME,
clusterWeightRequest,
new TransportResponseHandler<ClusterPutWRRWeightsResponse>() {
@Override
public void handleResponse(ClusterPutWRRWeightsResponse response) {
logger.info("Weights are successfully set.");
}

@Override
public void handleException(TransportException exp) {
// Logging warn message on failure. Should we do Retry? If weights are not set should we fail?
logger.warn("Exception occurred while setting weights.Exception Messages - [{}]", exp.unwrapCause().getMessage());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClusterPutWRRWeightsResponse read(StreamInput in) throws IOException {
return new ClusterPutWRRWeightsResponse(in);
}
}
);
}

public void checkHttpStatsForDecommissionedNodes(
Set<DiscoveryNode> decommissionedNodes,
String reason,
TimeValue timeout,
TimeValue timeoutForNodeDecommission,
ActionListener<Void> listener
) {

if (timeoutForNodeDecommission.getSeconds() > 0) {
// Wait for timeout to happen. Log the active connection before decommissioning of nodes.
scheduleDecommissionNodesRequestCheck(decommissionedNodes, reason, timeout, listener, timeoutForNodeDecommission);
} else {
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, listener);
}
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i = 0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
}

private void scheduleDecommissionNodesRequestCheck(
Set<DiscoveryNode> decommissionedNodes,
String reason,
TimeValue timeout,
ActionListener<Void> nodesRemovedListener,
TimeValue timeoutForNodeDecommission
) {
transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
// Check for active connections.
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, nodesRemovedListener);
}

@Override
public String toString() {
return "";
}
}, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME);
}

private void getActiveRequestCountOnDecommissionNodes(Set<DiscoveryNode> decommissionedNodes) {
if (decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new);

if (nodes.length == 0) {
return;
}

final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes);
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());

transportService.sendRequest(
transportService.getLocalNode(),
NodesStatsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
logActiveConnections(response);
}

@Override
public void handleException(TransportException exp) {
logger.warn("Failure occurred while dumping connection for decommission nodes. [{}]", exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
*/
public void startDecommissionAction(
final DecommissionAttribute decommissionAttribute,
final ActionListener<ClusterStateUpdateResponse> listener
final ActionListener<ClusterStateUpdateResponse> listener,
final TimeValue timeOutForNodeDecommission
) {
// register the metadata with status as INIT as first step
clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) {
Expand Down Expand Up @@ -156,14 +157,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
decommissionAttributeMetadata.decommissionAttribute(),
decommissionAttributeMetadata.status()
);
decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener);
decommissionClusterManagerNodes(
decommissionAttributeMetadata.decommissionAttribute(),
listener,
timeOutForNodeDecommission
);
}
});
}

private synchronized void decommissionClusterManagerNodes(
final DecommissionAttribute decommissionAttribute,
ActionListener<ClusterStateUpdateResponse> listener
ActionListener<ClusterStateUpdateResponse> listener,
TimeValue timeOutForNodeDecommission
) {
ClusterState state = clusterService.getClusterApplierService().state();
// since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further
Expand Down Expand Up @@ -206,7 +212,7 @@ public void onResponse(Void unused) {
// and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission
// nodes can be part of Voting Config
listener.onResponse(new ClusterStateUpdateResponse(true));
failDecommissionedNodes(clusterService.getClusterApplierService().state());
failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission);
}
} else {
// explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager
Expand Down Expand Up @@ -303,19 +309,26 @@ public void onFailure(Exception e) {
}
}

private void failDecommissionedNodes(ClusterState state) {
private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) {
// this method ensures no matter what, we always exit from this function after clearing the voting config exclusion
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();

// Awareness values refers to all zones in the cluster
List<String> awarenessValues = forcedAwarenessAttributes.get(decommissionAttribute.attributeName());

decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.IN_PROGRESS, new ActionListener<>() {
@Override
public void onResponse(DecommissionStatus status) {
logger.info("updated the decommission status to [{}]", status);
// execute nodes decommissioning
decommissionController.removeDecommissionedNodes(

decommissionController.handleNodesDecommissionRequest(
filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false),
awarenessValues,
"nodes-decommissioned",
TimeValue.timeValueSeconds(120L),
timeOutForNodeDecommission,
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,10 +1188,10 @@ public IndexMetadata getSafe(Index index) {
return indexMetadata;
}
throw new IndexNotFoundException(
index,
new IllegalStateException(
"index uuid doesn't match expected: [" + index.getUUID() + "] but got: [" + indexMetadata.getIndexUUID() + "]"
)
index,
new IllegalStateException(
"index uuid doesn't match expected: [" + index.getUUID() + "] but got: [" + indexMetadata.getIndexUUID() + "]"
)
);
}
throw new IndexNotFoundException(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,4 @@ public static void toXContent(WRRWeights wrrWeight, XContentBuilder builder) thr
public String toString() {
return Strings.toString(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ public Map<String, Object> weights() {
public String attributeName() {
return this.attributeName;
}
}
}

0 comments on commit 79f570c

Please sign in to comment.