From 435a2e9c0c7faeb458bb2d8de7b80061a36e47d9 Mon Sep 17 00:00:00 2001 From: Aditya Jindal Date: Wed, 29 Jul 2020 15:49:47 -0700 Subject: [PATCH] Consume Queue Capacity from Node Config Cache. (#314) Consume current queue capacity from NodeConfigCollector. A node level cache is already implemented which has updated values of the queues and caches. We need to read these values in the Decider implementation for the decider to compute the updated value of the Queue. Co-authored-by: Aditya Jindal --- .../deciders/QueueHealthDecider.java | 29 +++++++++++++------ .../deciders/QueueHealthDeciderTest.java | 12 ++++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDecider.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDecider.java index 10b87611d..be990f069 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDecider.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDecider.java @@ -17,6 +17,8 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action; import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ModifyQueueCapacityAction; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; @@ -24,16 +26,17 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.QueueRejectionClusterRca; - import java.util.ArrayList; import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; // This is a sample decider implementation to finalize decision maker interfaces. // TODO: 1. Read action priorities from a configurable yml -// TODO: 2. Read current queue capacity from NodeConfigurationRca (PR #252) public class QueueHealthDecider extends Decider { + private static final Logger LOG = LogManager.getLogger(Decider.class); public static final String NAME = "queue_health"; private QueueRejectionClusterRca queueRejectionRca; @@ -41,7 +44,6 @@ public class QueueHealthDecider extends Decider { private int counter = 0; public QueueHealthDecider(long evalIntervalSeconds, int decisionFrequency, QueueRejectionClusterRca queueRejectionClusterRca) { - // TODO: Also consume NodeConfigurationRca super(evalIntervalSeconds, decisionFrequency); this.queueRejectionRca = queueRejectionClusterRca; configureActionPriority(); @@ -93,9 +95,20 @@ private void configureActionPriority() { */ private Action computeBestAction(NodeKey esNode, ResourceEnum threadPool) { Action action = null; + int currQueueCapacity; + try { + currQueueCapacity = getNodeQueueCapacity(esNode, threadPool); + } catch (Exception e) { + // No action if value not present in the cache. + // Assumption here is the cache has been wiped off due to + // unforeseen events and we dont want to trigger any action. + LOG.error("Exception while reading values from Node Config Cache", e); + return null; + } + for (String actionName : actionsByUserPriority) { action = - getAction(actionName, esNode, threadPool, getNodeQueueCapacity(esNode, threadPool), true); + getAction(actionName, esNode, threadPool, currQueueCapacity, true); if (action != null) { break; } @@ -121,10 +134,8 @@ private ModifyQueueCapacityAction configureQueueCapacity(NodeKey esNode, Resourc } private int getNodeQueueCapacity(NodeKey esNode, ResourceEnum threadPool) { - // TODO: use NodeConfigurationRca to return capacity, for now returning defaults - if (threadPool.equals(ResourceEnum.SEARCH_THREADPOOL)) { - return 1000; - } - return 100; + return (int) getAppContext().getNodeConfigCache().get(esNode, Resource.newBuilder() + .setResourceEnum(threadPool) + .setMetricEnum(MetricEnum.QUEUE_CAPACITY).build()); } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDeciderTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDeciderTest.java index 401848060..3c7ba4520 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDeciderTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/QueueHealthDeciderTest.java @@ -15,6 +15,8 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCH_QUEUE_CAPACITY; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.WRITE_QUEUE_CAPACITY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -27,6 +29,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.QueueRejectionClusterRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; import java.util.ArrayList; @@ -80,11 +83,20 @@ public void testHighRejectionRemediation() { RcaTestHelper.generateFlowUnit("node3", "127.0.0.3", Resources.State.UNHEALTHY, ResourceUtil.SEARCH_QUEUE_REJECTION), RcaTestHelper.generateFlowUnit("node4", "127.0.0.4", Resources.State.HEALTHY) ); + appContext.getNodeConfigCache().put(new NodeKey("node1", "127.0.0.1"), + SEARCH_QUEUE_CAPACITY,5000); + appContext.getNodeConfigCache().put(new NodeKey("node1", "127.0.0.1"), + WRITE_QUEUE_CAPACITY,5000); + appContext.getNodeConfigCache().put(new NodeKey("node2", "127.0.0.2"), + WRITE_QUEUE_CAPACITY,5000); + appContext.getNodeConfigCache().put(new NodeKey("node3", "127.0.0.3"), + SEARCH_QUEUE_CAPACITY,5000); QueueRejectionClusterRca queueClusterRca = new QueueRejectionClusterRca(1, nodeRca); queueClusterRca.setAppContext(appContext); queueClusterRca.generateFlowUnitListFromLocal(null); QueueHealthDecider decider = new QueueHealthDecider(5, 12, queueClusterRca); + decider.setAppContext(appContext); // Since deciderFrequency is 12, the first 11 invocations return empty decision for (int i = 0; i < 11; i++) {