Skip to content

HBASE-28898. Use reflection to access recoverLease(), setSafeMode() APIs. (#6342) #6390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,6 +40,40 @@ public final class RecoverLeaseFSUtils {

private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class);

private static Class<?> leaseRecoverableClazz = null;
private static Method recoverLeaseMethod = null;
public static final String LEASE_RECOVERABLE_CLASS_NAME = "org.apache.hadoop.fs.LeaseRecoverable";
static {
LOG.debug("Initialize RecoverLeaseFSUtils");
initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
}

/**
* Initialize reflection classes and methods. If LeaseRecoverable class is not found, look for
* DistributedFilSystem#recoverLease method.
*/
static void initializeRecoverLeaseMethod(String className) {
try {
leaseRecoverableClazz = Class.forName(className);
recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease", Path.class);
LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()");
} catch (ClassNotFoundException e) {
LOG.debug(
"LeaseRecoverable interface not in the classpath, this means Hadoop 3.3.5 or below.");
try {
recoverLeaseMethod = DistributedFileSystem.class.getMethod("recoverLease", Path.class);
} catch (NoSuchMethodException ex) {
LOG.error("Cannot find recoverLease method in DistributedFileSystem class. "
+ "It should never happen. Abort.", ex);
throw new RuntimeException(ex);
}
} catch (NoSuchMethodException e) {
LOG.error("Cannot find recoverLease method in LeaseRecoverable class. "
+ "It should never happen. Abort.", e);
throw new RuntimeException(e);
}
}

private RecoverLeaseFSUtils() {
}

Expand All @@ -48,18 +83,31 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
}

/**
* Recover the lease from HDFS, retrying multiple times.
* Recover the lease from Hadoop file system, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}

// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
if (isLeaseRecoverable(fs)) {
recoverDFSFileLease(fs, p, conf, reporter);
}
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}

public static boolean isLeaseRecoverable(FileSystem fs) {
// return true if HDFS.
if (fs instanceof DistributedFileSystem) {
return true;
}
// return true if the file system implements LeaseRecoverable interface.
if (leaseRecoverableClazz != null) {
return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
}
// return false if the file system is not HDFS and does not implement LeaseRecoverable.
return false;
}

/*
Expand All @@ -81,7 +129,7 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
* false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second, and
* we might be able to exit early.
*/
private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
private static boolean recoverDFSFileLease(final FileSystem dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
Expand Down Expand Up @@ -167,21 +215,25 @@ private static boolean checkIfTimedout(final Configuration conf, final long reco
* Try to recover the lease.
* @return True if dfs#recoverLease came by true.
*/
private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt,
final Path p, final long startWaiting) throws FileNotFoundException {
private static boolean recoverLease(final FileSystem dfs, final int nbAttempt, final Path p,
final long startWaiting) throws FileNotFoundException {
boolean recovered = false;
try {
recovered = dfs.recoverLease(p);
recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
+ getLogMessageDetail(nbAttempt, p, startWaiting));
} catch (IOException e) {
} catch (InvocationTargetException ite) {
final Throwable e = ite.getCause();
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
// This exception comes out instead of FNFE, fix it
throw new FileNotFoundException("The given WAL wasn't found at " + p);
} else if (e instanceof FileNotFoundException) {
throw (FileNotFoundException) e;
}
LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
} catch (IllegalAccessException e) {
LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
throw new RuntimeException(e);
}
return recovered;
}
Expand All @@ -197,8 +249,7 @@ private static String getLogMessageDetail(final int nbAttempt, final Path p,
* Call HDFS-4525 isFileClosed if it is available.
* @return True if file is closed.
*/
private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m,
final Path p) {
private static boolean isFileClosed(final FileSystem dfs, final Method m, final Path p) {
try {
return (Boolean) m.invoke(dfs, p);
} catch (SecurityException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
*/
package org.apache.hadoop.hbase.util;

import static org.apache.hadoop.hbase.util.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
Expand All @@ -30,7 +38,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

/**
* Test our recoverLease loop against mocked up filesystem.
Expand Down Expand Up @@ -58,38 +65,84 @@ public class TestRecoverLeaseFSUtils {
public void testRecoverLease() throws IOException {
long startTime = EnvironmentEdgeManager.currentTime();
HTU.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class);
Mockito.when(reporter.progress()).thenReturn(true);
DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
CancelableProgressable reporter = mock(CancelableProgressable.class);
when(reporter.progress()).thenReturn(true);
DistributedFileSystem dfs = mock(DistributedFileSystem.class);
// Fail four times and pass on the fifth.
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
.thenReturn(false).thenReturn(true);
RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
verify(dfs, times(5)).recoverLease(FILE);
// Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two
// invocations will happen pretty fast... the we fall into the longer wait loop).
assertTrue((EnvironmentEdgeManager.currentTime() - startTime)
> (3 * HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000)));
}

private interface FakeLeaseRecoverable {
@SuppressWarnings("unused")
boolean recoverLease(Path p) throws IOException;

@SuppressWarnings("unused")
boolean isFileClosed(Path p) throws IOException;
}

private static abstract class RecoverableFileSystem extends FileSystem
implements FakeLeaseRecoverable {
@Override
public boolean recoverLease(Path p) throws IOException {
return true;
}

@Override
public boolean isFileClosed(Path p) throws IOException {
return true;
}
}

/**
* Test that we can use reflection to access LeaseRecoverable methods.
*/
@Test
public void testLeaseRecoverable() throws IOException {
try {
// set LeaseRecoverable to FakeLeaseRecoverable for testing
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName());
RecoverableFileSystem mockFS = mock(RecoverableFileSystem.class);
when(mockFS.recoverLease(FILE)).thenReturn(true);
RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, HTU.getConfiguration());
verify(mockFS, times(1)).recoverLease(FILE);

assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(mock(RecoverableFileSystem.class)));
} finally {
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
}
}

/**
* Test that isFileClosed makes us recover lease faster.
*/
@Test
public void testIsFileClosed() throws IOException {
// Make this time long so it is plain we broke out because of the isFileClosed invocation.
HTU.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 100000);
CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class);
Mockito.when(reporter.progress()).thenReturn(true);
IsFileClosedDistributedFileSystem dfs = Mockito.mock(IsFileClosedDistributedFileSystem.class);
CancelableProgressable reporter = mock(CancelableProgressable.class);
when(reporter.progress()).thenReturn(true);
IsFileClosedDistributedFileSystem dfs = mock(IsFileClosedDistributedFileSystem.class);
// Now make it so we fail the first two times -- the two fast invocations, then we fall into
// the long loop during which we will call isFileClosed.... the next invocation should
// therefore return true if we are to break the loop.
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
when(dfs.isFileClosed(FILE)).thenReturn(true);
RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
verify(dfs, times(2)).recoverLease(FILE);
verify(dfs, times(1)).isFileClosed(FILE);
}

@Test
public void testIsLeaseRecoverable() {
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new DistributedFileSystem()));
assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ public final class FSUtils {
// currently only used in testing. TODO refactor into a test class
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");

private static Class<?> safeModeClazz = null;
private static Class<?> safeModeActionClazz = null;
private static Object safeModeGet = null;
{
try {
safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
} catch (ClassNotFoundException | NoSuchFieldException e) {
LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
} catch (IllegalAccessException e) {
LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. "
+ "Unexpected Hadoop version or messy classpath?", e);
throw new RuntimeException(e);
}
}

private FSUtils() {
}

Expand Down Expand Up @@ -248,8 +265,20 @@ public static void checkFileSystemAvailable(final FileSystem fs) throws IOExcept
* @param dfs A DistributedFileSystem object representing the underlying HDFS.
* @return whether we're in safe mode
*/
private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
return dfs.setSafeMode(SAFEMODE_GET, true);
private static boolean isInSafeMode(FileSystem dfs) throws IOException {
if (isDistributedFileSystem(dfs)) {
return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
} else {
try {
Object ret = dfs.getClass()
.getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
.invoke(dfs, safeModeGet, true);
return (Boolean) ret;
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
LOG.error("The file system does not support setSafeMode(). Abort.", e);
throw new RuntimeException(e);
}
}
}

/**
Expand All @@ -258,9 +287,8 @@ private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOExceptio
public static void checkDfsSafeMode(final Configuration conf) throws IOException {
boolean isInSafeMode = false;
FileSystem fs = FileSystem.get(conf);
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
isInSafeMode = isInSafeMode(dfs);
if (supportSafeMode(fs)) {
isInSafeMode = isInSafeMode(fs);
}
if (isInSafeMode) {
throw new IOException("File system is in safemode, it can't be written now");
Expand Down Expand Up @@ -635,10 +663,11 @@ public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId
*/
public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) return;
DistributedFileSystem dfs = (DistributedFileSystem) fs;
if (!supportSafeMode(fs)) {
return;
}
// Make sure dfs is not in safe mode
while (isInSafeMode(dfs)) {
while (isInSafeMode(fs)) {
LOG.info("Waiting for dfs to exit safe mode...");
try {
Thread.sleep(wait);
Expand All @@ -649,6 +678,19 @@ public static void waitOnSafeMode(final Configuration conf, final long wait) thr
}
}

public static boolean supportSafeMode(FileSystem fs) {
// return true if HDFS.
if (fs instanceof DistributedFileSystem) {
return true;
}
// return true if the file system implements SafeMode interface.
if (safeModeClazz != null) {
return (safeModeClazz.isAssignableFrom(fs.getClass()));
}
// return false if the file system is not HDFS and does not implement SafeMode interface.
return false;
}

/**
* Checks if meta region exists
* @param fs file system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -90,13 +91,23 @@ public void testIsHDFS() throws Exception {
try {
cluster = htu.startMiniDFSCluster(1);
assertTrue(CommonFSUtils.isHDFS(conf));
assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem()));
FSUtils.checkDfsSafeMode(conf);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

@Test
public void testLocalFileSystemSafeMode() throws Exception {
conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
assertFalse(CommonFSUtils.isHDFS(conf));
assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf)));
FSUtils.checkDfsSafeMode(conf);
}

private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
FSDataOutputStream out = fs.create(file);
byte[] data = new byte[dataSize];
Expand Down