Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class ClusterManagerMetrics {
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;
public final Counter nodeLeftCounter;
public final Counter fsHealthFailCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -87,6 +88,11 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
COUNTER_METRICS_UNIT
);
nodeLeftCounter = metricsRegistry.createCounter("node.left.count", "Counter for node left operation", COUNTER_METRICS_UNIT);
fsHealthFailCounter = metricsRegistry.createCounter(
"fsHealth.failure.count",
"Counter for number of times FS health check has failed",
COUNTER_METRICS_UNIT
);
}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -55,12 +57,14 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.cluster.ClusterManagerMetrics.NODE_ID_TAG;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;

Expand All @@ -74,6 +78,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH

private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private final ClusterManagerMetrics clusterManagerMetrics;
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
Expand Down Expand Up @@ -115,14 +120,21 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
Setting.Property.Dynamic
);

public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) {
public FsHealthService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
NodeEnvironment nodeEnv,
ClusterManagerMetrics clusterManagerMetrics
) {
this.threadPool = threadPool;
this.enabled = ENABLED_SETTING.get(settings);
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings);
this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis;
this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings);
this.nodeEnv = nodeEnv;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold);
clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled);
Expand Down Expand Up @@ -198,13 +210,25 @@ public void run() {
} catch (Exception e) {
logger.error("health check failed", e);
} finally {
emitMetric();
if (checkEnabled) {
boolean completed = checkInProgress.compareAndSet(true, false);
assert completed;
}
}
}

private void emitMetric() {
StatusInfo healthStatus = getHealth();
if (healthStatus.getStatus() == UNHEALTHY) {
clusterManagerMetrics.incrementCounter(
clusterManagerMetrics.fsHealthFailCounter,
1.0,
Optional.ofNullable(Tags.create().addTag(NODE_ID_TAG, nodeEnv.nodeId()))
);
}
}

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
Path[] paths = null;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
settings,
clusterService.getClusterSettings(),
threadPool,
nodeEnvironment
nodeEnvironment,
clusterManagerMetrics
);
final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.tests.mockfile.FilterFileChannel;
import org.apache.lucene.tests.mockfile.FilterFileSystemProvider;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.DeterministicTaskQueue;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.io.PathUtilsForTesting;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.TestInMemoryMetricsRegistry;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
Expand All @@ -57,6 +59,7 @@
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -71,19 +74,29 @@
public class FsHealthServiceTests extends OpenSearchTestCase {

private DeterministicTaskQueue deterministicTaskQueue;
private TestInMemoryMetricsRegistry metricsRegistry;
private ClusterManagerMetrics clusterManagerMetrics;

@Before
public void createObjects() {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
metricsRegistry = new TestInMemoryMetricsRegistry();
clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry);
}

public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception {
long refreshInterval = randomLongBetween(1000, 12000);
final Settings settings = Settings.builder().put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms").build();
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), env);
FsHealthService fsHealthService = new FsHealthService(
settings,
clusterSettings,
deterministicTaskQueue.getThreadPool(),
env,
clusterManagerMetrics
);
final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();
fsHealthService.doStart();
assertFalse(deterministicTaskQueue.hasRunnableTasks());
Expand Down Expand Up @@ -117,17 +130,24 @@ public void testFailsHealthOnIOException() throws IOException {
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());

// disrupt file system
disruptFileSystemProvider.restrictPathPrefix(""); // disrupt all paths
disruptFileSystemProvider.injectIOException.set(true);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertEquals(
Double.valueOf(1),
metricsRegistry.getCounterStore()
.get("fsHealth.failure.count")
.getCounterValueForTags()
.get((Map.of("node_id", env.nodeId())))
);
for (Path path : env.nodeDataPaths()) {
assertTrue(fsHealthService.getHealth().getInfo().contains(path.toString()));
}
Expand Down Expand Up @@ -160,7 +180,7 @@ public void testLoggingOnHungIO() throws Exception {
MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(FsHealthService.class));
NodeEnvironment env = newNodeEnvironment()
) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
int counter = 0;
for (Path path : env.nodeDataPaths()) {
mockAppender.addExpectation(
Expand Down Expand Up @@ -202,7 +222,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception {
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
logger.info("--> Initial health status prior to the first monitor run");
StatusInfo fsHealth = fsHealthService.getHealth();
assertEquals(HEALTHY, fsHealth.getStatus());
Expand All @@ -214,7 +234,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception {
assertEquals("health check passed", fsHealth.getInfo());
logger.info("--> Disrupt file system");
disruptFileSystemProvider.injectIODelay.set(true);
final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env);
final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthSrvc.doStart();
waitUntil(
() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY,
Expand Down Expand Up @@ -254,7 +274,7 @@ public void testFailsHealthOnSinglePathFsyncFailure() throws IOException {
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
Path[] paths = env.nodeDataPaths();
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
Expand All @@ -263,9 +283,16 @@ public void testFailsHealthOnSinglePathFsyncFailure() throws IOException {
disruptFsyncFileSystemProvider.injectIOException.set(true);
String disruptedPath = randomFrom(paths).toString();
disruptFsyncFileSystemProvider.restrictPathPrefix(disruptedPath);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertEquals(
Double.valueOf(1),
metricsRegistry.getCounterStore()
.get("fsHealth.failure.count")
.getCounterValueForTags()
.get((Map.of("node_id", env.nodeId())))
);
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
assertEquals(1, disruptFsyncFileSystemProvider.getInjectedPathCount());
} finally {
Expand All @@ -285,7 +312,7 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
Path[] paths = env.nodeDataPaths();
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
Expand All @@ -294,7 +321,7 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
String disruptedPath = randomFrom(paths).toString();
disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath);
disruptWritesFileSystemProvider.injectIOException.set(true);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
Expand All @@ -319,17 +346,24 @@ public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());

// enabling unexpected file size injection
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);

fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, clusterManagerMetrics);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertEquals(
Double.valueOf(1),
metricsRegistry.getCounterStore()
.get("fsHealth.failure.count")
.getCounterValueForTags()
.get((Map.of("node_id", env.nodeId())))
);
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
} finally {
Expand Down