Skip to content

Commit

Permalink
Consume Queue Capacity from Node Config Cache. (opensearch-project#314)
Browse files Browse the repository at this point in the history
Consume current queue capacity from NodeConfigCollector.

A node level cache is already implemented which has updated values of the queues and caches. We need to read these values in the Decider implementation for the decider to compute the updated value of the Queue.

Co-authored-by: Aditya Jindal <aditjind@amazon.com>
  • Loading branch information
adityaj1107 and Aditya Jindal authored Jul 29, 2020
1 parent f1c0f03 commit 435a2e9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ModifyQueueCapacityAction;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.QueueRejectionClusterRca;

import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

// This is a sample decider implementation to finalize decision maker interfaces.
// TODO: 1. Read action priorities from a configurable yml
// TODO: 2. Read current queue capacity from NodeConfigurationRca (PR #252)

public class QueueHealthDecider extends Decider {

private static final Logger LOG = LogManager.getLogger(Decider.class);
public static final String NAME = "queue_health";

private QueueRejectionClusterRca queueRejectionRca;
List<String> actionsByUserPriority = new ArrayList<>();
private int counter = 0;

public QueueHealthDecider(long evalIntervalSeconds, int decisionFrequency, QueueRejectionClusterRca queueRejectionClusterRca) {
// TODO: Also consume NodeConfigurationRca
super(evalIntervalSeconds, decisionFrequency);
this.queueRejectionRca = queueRejectionClusterRca;
configureActionPriority();
Expand Down Expand Up @@ -93,9 +95,20 @@ private void configureActionPriority() {
*/
private Action computeBestAction(NodeKey esNode, ResourceEnum threadPool) {
Action action = null;
int currQueueCapacity;
try {
currQueueCapacity = getNodeQueueCapacity(esNode, threadPool);
} catch (Exception e) {
// No action if value not present in the cache.
// Assumption here is the cache has been wiped off due to
// unforeseen events and we dont want to trigger any action.
LOG.error("Exception while reading values from Node Config Cache", e);
return null;
}

for (String actionName : actionsByUserPriority) {
action =
getAction(actionName, esNode, threadPool, getNodeQueueCapacity(esNode, threadPool), true);
getAction(actionName, esNode, threadPool, currQueueCapacity, true);
if (action != null) {
break;
}
Expand All @@ -121,10 +134,8 @@ private ModifyQueueCapacityAction configureQueueCapacity(NodeKey esNode, Resourc
}

private int getNodeQueueCapacity(NodeKey esNode, ResourceEnum threadPool) {
// TODO: use NodeConfigurationRca to return capacity, for now returning defaults
if (threadPool.equals(ResourceEnum.SEARCH_THREADPOOL)) {
return 1000;
}
return 100;
return (int) getAppContext().getNodeConfigCache().get(esNode, Resource.newBuilder()
.setResourceEnum(threadPool)
.setMetricEnum(MetricEnum.QUEUE_CAPACITY).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCH_QUEUE_CAPACITY;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.WRITE_QUEUE_CAPACITY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -27,6 +29,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.QueueRejectionClusterRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,11 +83,20 @@ public void testHighRejectionRemediation() {
RcaTestHelper.generateFlowUnit("node3", "127.0.0.3", Resources.State.UNHEALTHY, ResourceUtil.SEARCH_QUEUE_REJECTION),
RcaTestHelper.generateFlowUnit("node4", "127.0.0.4", Resources.State.HEALTHY)
);
appContext.getNodeConfigCache().put(new NodeKey("node1", "127.0.0.1"),
SEARCH_QUEUE_CAPACITY,5000);
appContext.getNodeConfigCache().put(new NodeKey("node1", "127.0.0.1"),
WRITE_QUEUE_CAPACITY,5000);
appContext.getNodeConfigCache().put(new NodeKey("node2", "127.0.0.2"),
WRITE_QUEUE_CAPACITY,5000);
appContext.getNodeConfigCache().put(new NodeKey("node3", "127.0.0.3"),
SEARCH_QUEUE_CAPACITY,5000);

QueueRejectionClusterRca queueClusterRca = new QueueRejectionClusterRca(1, nodeRca);
queueClusterRca.setAppContext(appContext);
queueClusterRca.generateFlowUnitListFromLocal(null);
QueueHealthDecider decider = new QueueHealthDecider(5, 12, queueClusterRca);
decider.setAppContext(appContext);

// Since deciderFrequency is 12, the first 11 invocations return empty decision
for (int i = 0; i < 11; i++) {
Expand Down

0 comments on commit 435a2e9

Please sign in to comment.