Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
private volatile TimeValue slowPathLoggingThreshold;
private final NodeEnvironment nodeEnv;
Expand Down Expand Up @@ -117,6 +118,8 @@ public StatusInfo getHealth() {
Set<Path> unhealthyPaths = this.unhealthyPaths;
if (enabled == false) {
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
} else if (brokenLock) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
} else if (unhealthyPaths == null) {
statusInfo = new StatusInfo(HEALTHY, "health check passed");
} else {
Expand Down Expand Up @@ -150,7 +153,16 @@ public void run() {

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
for (Path path : nodeEnv.nodeDataPaths()) {
Path[] paths = null;
try {
paths = nodeEnv.nodeDataPaths();
} catch (IllegalStateException e) {
logger.error("health check failed", e);
brokenLock = true;
return;
}

for (Path path : paths) {
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Expand All @@ -176,6 +188,7 @@ private void monitorFSHealth() {
}
}
unhealthyPaths = currentUnhealthyPaths;
brokenLock = false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -231,6 +231,36 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
}
}

public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
final Settings settings = Settings.EMPTY;
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider(
fileSystem, 1, testThreadPool);
fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());

// enabling unexpected file size injection
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);

fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
} finally {
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {

AtomicBoolean injectIOException = new AtomicBoolean();
Expand All @@ -254,7 +284,8 @@ public int getInjectedPathCount(){
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
if (injectIOException.get()){
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -289,7 +320,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
public void force(boolean metaData) throws IOException {
if (injectIOException.get()) {
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -341,4 +373,39 @@ public void force(boolean metaData) throws IOException {
};
}
}

private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider {

AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();

private final long size;
private final ThreadPool threadPool;

FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) {
super("disrupt_fs_health://", inner);
this.size = size;
this.threadPool = threadPool;
}

public int getInjectedPathCount(){
return injectedPaths.get();
}

@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
@Override
public long size() throws IOException {
if (injectUnexpectedFileSize.get()) {
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) {
injectedPaths.incrementAndGet();
return size;
}
}
return super.size();
}
};
}
}
}