Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
- Support Nested Aggregations as part of Star-Tree ([#18048](https://github.com/opensearch-project/OpenSearch/pull/18048))
- [Star-Tree] Support for date-range queries with star-tree supported aggregations ([#17855](https://github.com/opensearch-project/OpenSearch/pull/17855)
- Added node-left metric to cluster manager ([#18421](https://github.com/opensearch-project/OpenSearch/pull/18421))
- [Star tree] Remove star tree feature flag and add index setting to configure star tree search on index basis ([#18070](https://github.com/opensearch-project/OpenSearch/pull/18070))
- Approximation Framework Enhancement: Update the BKD traversal logic to improve the performance on skewed data ([#18439](https://github.com/opensearch-project/OpenSearch/issues/18439))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
public final class ClusterManagerMetrics {

public static final String FOLLOWER_NODE_ID_TAG = "follower_node_id";
public static final String REASON_TAG = "reason";
private static final String LATENCY_METRIC_UNIT_MS = "ms";
private static final String COUNTER_METRICS_UNIT = "1";

Expand All @@ -36,6 +38,7 @@ public final class ClusterManagerMetrics {
public final Counter followerChecksFailureCounter;
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;
public final Counter nodeLeftCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -83,7 +86,7 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

nodeLeftCounter = metricsRegistry.createCounter("node.left.count", "Counter for node left operation", COUNTER_METRICS_UNIT);
}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand All @@ -111,6 +112,12 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.ClusterManagerMetrics.FOLLOWER_NODE_ID_TAG;
import static org.opensearch.cluster.ClusterManagerMetrics.REASON_TAG;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_DISCONNECTED;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_HEALTHCHECK_FAIL;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_LAGGING;
import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
Expand Down Expand Up @@ -193,6 +200,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final ClusterSettings clusterSettings;
private final ClusterManagerMetrics clusterManagerMetrics;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -250,6 +258,7 @@ public Coordinator(
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.clusterManagerMetrics = clusterManagerMetrics;
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(
Expand Down Expand Up @@ -359,6 +368,20 @@ private void removeNode(DiscoveryNode discoveryNode, String reason) {
nodeRemovalExecutor,
nodeRemovalExecutor
);
String reasonToPublish = switch (reason) {
case NODE_LEFT_REASON_DISCONNECTED -> "disconnected";
case NODE_LEFT_REASON_LAGGING -> "lagging";
case NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL -> "follower.check.fail";
case NODE_LEFT_REASON_HEALTHCHECK_FAIL -> "health.check.fail";
default -> reason;
};
clusterManagerMetrics.incrementCounter(
clusterManagerMetrics.nodeLeftCounter,
1.0,
Optional.ofNullable(
Tags.create().addTag(FOLLOWER_NODE_ID_TAG, discoveryNode.getId()).addTag(REASON_TAG, reasonToPublish)
)
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class FollowersChecker {
private static final Logger logger = LogManager.getLogger(FollowersChecker.class);

public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
public static final String NODE_LEFT_REASON_LAGGING = "lagging";
public static final String NODE_LEFT_REASON_DISCONNECTED = "disconnected";
public static final String NODE_LEFT_REASON_HEALTHCHECK_FAIL = "health check failed";
public static final String NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL = "followers check retry count exceeded";

// the time between checks sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
Expand Down Expand Up @@ -398,13 +402,13 @@ public void handleException(TransportException exp) {
final String reason;
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
reason = NODE_LEFT_REASON_DISCONNECTED;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.info(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
reason = "health check failed";
reason = NODE_LEFT_REASON_HEALTHCHECK_FAIL;
} else if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
logger.info(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
reason = "followers check retry count exceeded";
reason = NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
} else {
logger.info(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@
import org.opensearch.gateway.GatewayService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.telemetry.TestInMemoryCounter;
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -250,6 +254,25 @@ public void testUnhealthyNodesGetsRemoved() {
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode1.getId()));
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode2.getId()));

TestInMemoryMetricsRegistry clusterManagerMetricsRegistry = leader.getMetricsRegistry();
TestInMemoryCounter nodeLeftCounter = clusterManagerMetricsRegistry.getCounterStore().get("node.left.count");
assertNotNull("node.left.count counter should be present", nodeLeftCounter);
ConcurrentHashMap<Map<String, ?>, Double> counterValuesByTags = nodeLeftCounter.getCounterValueForTags();

// Check for newNode1
Map<String, Object> tags1 = new HashMap<>();
tags1.put("follower_node_id", newNode1.getId());
tags1.put("reason", "health.check.fail");
assertTrue(counterValuesByTags.containsKey(tags1));
assertEquals(Double.valueOf(1.0), counterValuesByTags.get(tags1));

// Check for newNode2
Map<String, Object> tags2 = new HashMap<>();
tags2.put("follower_node_id", newNode2.getId());
tags2.put("reason", "health.check.fail");
assertTrue(counterValuesByTags.containsKey(tags2));
assertEquals(Double.valueOf(1.0), counterValuesByTags.get(tags2));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down
1 change: 1 addition & 0 deletions test/framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies {
api project(':libs:opensearch-nio')
api project(":server")
api project(":libs:opensearch-cli")
api project(":libs:opensearch-telemetry")
api project(":test:telemetry")
api "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
api "junit:junit:${versions.junit}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.disruption.DisruptableMockTransport;
import org.opensearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -1059,6 +1060,7 @@ class ClusterNode {
private RepositoriesService repositoriesService;
private RemoteStoreNodeService remoteStoreNodeService;
List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
private TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry();

ClusterNode(int nodeIndex, boolean clusterManagerEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this(
Expand Down Expand Up @@ -1188,7 +1190,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE),
new ClusterManagerMetrics(metricsRegistry),
null
);
coordinator.setNodeConnectionsService(nodeConnectionsService);
Expand All @@ -1211,6 +1213,10 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
coordinator.startInitialJoin();
}

public TestInMemoryMetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}

void close() {
assertThat("must add nodes to a cluster before closing them", clusterNodes, hasItem(ClusterNode.this));
onNode(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
* compatible open source license.
*/

package org.opensearch.telemetry;
package org.opensearch.test.telemetry;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -24,13 +24,26 @@
public class TestInMemoryCounter implements Counter {

private AtomicInteger counterValue = new AtomicInteger(0);
private ConcurrentHashMap<HashMap<String, ?>, Double> counterValueForTags = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Map<String, ?>, Double> counterValueForTags = new ConcurrentHashMap<>();

/**
* Constructor.
*/
public TestInMemoryCounter() {}

/**
* returns the counter value.
* @return
*/
public Integer getCounterValue() {
return this.counterValue.get();
}

public ConcurrentHashMap<HashMap<String, ?>, Double> getCounterValueForTags() {
/**
* returns the counter value tags
* @return
*/
public ConcurrentHashMap<Map<String, ?>, Double> getCounterValueForTags() {
return this.counterValueForTags;
}

Expand All @@ -41,12 +54,12 @@ public void add(double value) {

@Override
public synchronized void add(double value, Tags tags) {
HashMap<String, ?> hashMap = (HashMap<String, ?>) tags.getTagsMap();
if (counterValueForTags.get(hashMap) == null) {
counterValueForTags.put(hashMap, value);
Map<String, ?> tagsMap = tags.getTagsMap();
if (counterValueForTags.get(tagsMap) == null) {
counterValueForTags.put(tagsMap, value);
} else {
value = counterValueForTags.get(hashMap) + value;
counterValueForTags.put(hashMap, value);
value = counterValueForTags.get(tagsMap) + value;
counterValueForTags.put(tagsMap, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.telemetry;
package org.opensearch.test.telemetry;

import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.tags.Tags;
Expand All @@ -26,10 +26,23 @@ public class TestInMemoryHistogram implements Histogram {
private AtomicInteger histogramValue = new AtomicInteger(0);
private ConcurrentHashMap<HashMap<String, ?>, Double> histogramValueForTags = new ConcurrentHashMap<>();

/**
* Constructor.
*/
public TestInMemoryHistogram() {}

/**
* Returns the Histogram value.
* @return
*/
public Integer getHistogramValue() {
return this.histogramValue.get();
}

/**
* Returns the Histogram value for tags
* @return
*/
public ConcurrentHashMap<HashMap<String, ?>, Double> getHistogramValueForTags() {
return this.histogramValueForTags;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.telemetry;
package org.opensearch.test.telemetry;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
Expand All @@ -29,10 +29,23 @@ public class TestInMemoryMetricsRegistry implements MetricsRegistry {
private ConcurrentHashMap<String, TestInMemoryCounter> counterStore = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, TestInMemoryHistogram> histogramStore = new ConcurrentHashMap<>();

/**
* Constructor.
*/
public TestInMemoryMetricsRegistry() {}

/**
* Returns counterStore
* @return
*/
public ConcurrentHashMap<String, TestInMemoryCounter> getCounterStore() {
return this.counterStore;
}

/**
* Returns the histogramStore.
* @return
*/
public ConcurrentHashMap<String, TestInMemoryHistogram> getHistogramStore() {
return this.histogramStore;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Base opensearch package. */
package org.opensearch.test.telemetry;
Loading