Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
public final class ClusterManagerMetrics {

public static final String NODE_ID_TAG = "node_id";
public static final String REASON_TAG = "reason";
private static final String LATENCY_METRIC_UNIT_MS = "ms";
private static final String COUNTER_METRICS_UNIT = "1";

Expand All @@ -36,6 +38,8 @@ public final class ClusterManagerMetrics {
public final Counter followerChecksFailureCounter;
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;
public final Counter nodeLeftCounter;
public final Counter fsHealthFailCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -83,7 +87,12 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

nodeLeftCounter = metricsRegistry.createCounter("node.left.count", "Counter for node left operation", COUNTER_METRICS_UNIT);
fsHealthFailCounter = metricsRegistry.createCounter(
"fsHealth.failure.count",
"Counter for number of times FS health check has failed",
COUNTER_METRICS_UNIT
);
}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand All @@ -111,6 +112,12 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.ClusterManagerMetrics.NODE_ID_TAG;
import static org.opensearch.cluster.ClusterManagerMetrics.REASON_TAG;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_DISCONNECTED;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_HEALTHCHECK_FAIL;
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_LAGGING;
import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
Expand Down Expand Up @@ -193,6 +200,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final ClusterSettings clusterSettings;
private final ClusterManagerMetrics clusterManagerMetrics;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -250,6 +258,7 @@ public Coordinator(
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.clusterManagerMetrics = clusterManagerMetrics;
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(
Expand Down Expand Up @@ -359,6 +368,18 @@ private void removeNode(DiscoveryNode discoveryNode, String reason) {
nodeRemovalExecutor,
nodeRemovalExecutor
);
String reasonToPublish = switch (reason) {
case NODE_LEFT_REASON_DISCONNECTED -> "disconnected";
case NODE_LEFT_REASON_LAGGING -> "lagging";
case NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL -> "follower.check.fail";
case NODE_LEFT_REASON_HEALTHCHECK_FAIL -> "health.check.fail";
default -> reason;
};
clusterManagerMetrics.incrementCounter(
clusterManagerMetrics.nodeLeftCounter,
1.0,
Optional.ofNullable(Tags.create().addTag(NODE_ID_TAG, discoveryNode.getId()).addTag(REASON_TAG, reasonToPublish))
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class FollowersChecker {
private static final Logger logger = LogManager.getLogger(FollowersChecker.class);

public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
public static final String NODE_LEFT_REASON_LAGGING = "lagging";
public static final String NODE_LEFT_REASON_DISCONNECTED = "disconnected";
public static final String NODE_LEFT_REASON_HEALTHCHECK_FAIL = "health check failed";
public static final String NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL = "followers check retry count exceeded";

// the time between checks sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
Expand Down Expand Up @@ -398,13 +402,13 @@ public void handleException(TransportException exp) {
final String reason;
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
reason = NODE_LEFT_REASON_DISCONNECTED;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.info(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
reason = "health check failed";
reason = NODE_LEFT_REASON_HEALTHCHECK_FAIL;
} else if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
logger.info(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
reason = "followers check retry count exceeded";
reason = NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
} else {
logger.info(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -55,12 +57,14 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.cluster.ClusterManagerMetrics.NODE_ID_TAG;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;

Expand All @@ -74,6 +78,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH

private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private final ClusterManagerMetrics clusterManagerMetrics;
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
Expand Down Expand Up @@ -115,14 +120,21 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
Setting.Property.Dynamic
);

public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) {
public FsHealthService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
NodeEnvironment nodeEnv,
ClusterManagerMetrics clusterManagerMetrics
) {
this.threadPool = threadPool;
this.enabled = ENABLED_SETTING.get(settings);
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings);
this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis;
this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings);
this.nodeEnv = nodeEnv;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold);
clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled);
Expand Down Expand Up @@ -198,13 +210,25 @@ public void run() {
} catch (Exception e) {
logger.error("health check failed", e);
} finally {
emitMetric();
if (checkEnabled) {
boolean completed = checkInProgress.compareAndSet(true, false);
assert completed;
}
}
}

private void emitMetric() {
StatusInfo healthStatus = getHealth();
if (healthStatus.getStatus() == UNHEALTHY) {
clusterManagerMetrics.incrementCounter(
clusterManagerMetrics.fsHealthFailCounter,
1.0,
Optional.ofNullable(Tags.create().addTag(NODE_ID_TAG, nodeEnv.nodeId()))
);
}
}

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
Path[] paths = null;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
settings,
clusterService.getClusterSettings(),
threadPool,
nodeEnvironment
nodeEnvironment,
clusterManagerMetrics
);
final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
Expand Down
Loading
Loading