Skip to content

Commit

Permalink
HBASE-21688: Address WAL filesystem issues
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Elser <elserj@apache.org>
  • Loading branch information
Vladimir Rodionov authored and joshelser committed Jan 8, 2019
1 parent 7d6ce35 commit d26acbe
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -243,10 +243,11 @@ private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestam
throws IOException {
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ "\n newestTimestamps: " + newestTimestamps);
Path rootdir = FSUtils.getRootDir(conf);
Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = rootdir.getFileSystem(conf);

Path walRootDir = CommonFSUtils.getWALRootDir(conf);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = walRootDir.getFileSystem(conf);
NewestLogFilter pathFilter = new NewestLogFilter();

List<String> resultLogFiles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -151,8 +151,8 @@ private List<String> convert(List<FileStatus> walFiles) {
}

private List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
Path logRoot = new Path(FSUtils.getRootDir(c), HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = FileSystem.get(c);
Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = logRoot.getFileSystem(c);
RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
List<FileStatus> logFiles = new ArrayList<FileStatus>();
while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Random64;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand Down Expand Up @@ -981,10 +982,11 @@ public int run(Path inputDir, int numMappers) throws Exception {
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -506,10 +507,10 @@ private int doSearch(Configuration conf, String keysDir) throws Exception {
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;

/**
* WALLink describes a link to a WAL.
Expand All @@ -45,7 +45,7 @@ public class WALLink extends FileLink {
*/
public WALLink(final Configuration conf,
final String serverName, final String logName) throws IOException {
this(FSUtils.getWALRootDir(conf), serverName, logName);
this(CommonFSUtils.getWALRootDir(conf), serverName, logName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -187,8 +188,9 @@ public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) thr
* @return List of all RegionServer WAL dirs; i.e. this.rootDir/HConstants.HREGION_LOGDIR_NAME.
*/
public FileStatus[] getWALDirPaths(final PathFilter filter) throws IOException {
Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
FileStatus[] walDirForServerNames = FSUtils.listStatus(fs, walDirPath, filter);
Path walDirPath = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
FileStatus[] walDirForServerNames = FSUtils.listStatus(
CommonFSUtils.getWALFileSystem(conf), walDirPath, filter);
return walDirForServerNames == null? new FileStatus[0]: walDirForServerNames;
}

Expand All @@ -201,12 +203,12 @@ public FileStatus[] getWALDirPaths(final PathFilter filter) throws IOException {
* it.
*/
@Deprecated
public Set<ServerName> getFailedServersFromLogFolders() {
public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);

Set<ServerName> serverNames = new HashSet<>();
Path logsDirPath = new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME);
Path logsDirPath = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);

do {
if (services.isStopped()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition,
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -80,11 +81,11 @@ class WALEntryStream implements Closeable {
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics) throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.fs = CommonFSUtils.getWALFileSystem(conf);
this.conf = conf;
this.currentPositionOfEntry = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
Expand Down Expand Up @@ -312,10 +313,10 @@ private boolean openNextLog() throws IOException {
}

private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path walRootDir = CommonFSUtils.getWALRootDir(conf);

// Try found the log in old dir
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
Expand All @@ -324,7 +325,7 @@ private Path getArchivedLog(Path path) throws IOException {

// Try found the log in the seperate old log dir
oldLogDir =
new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
Expand Down Expand Up @@ -381,7 +382,8 @@ private void openReader(Path path) throws IOException {
// For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);

final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1680,9 +1680,9 @@ public boolean rebuildMeta(boolean fix) throws IOException,
* Meta recovery WAL directory inside WAL directory path.
*/
private void removeHBCKMetaRecoveryWALDir(String walFactoryId) throws IOException {
Path rootdir = FSUtils.getRootDir(getConf());
Path walLogDir = new Path(new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), walFactoryId);
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Path walLogDir = new Path(new Path(CommonFSUtils.getWALRootDir(getConf()),
HConstants.HREGION_LOGDIR_NAME), walFactoryId);
FileSystem fs = CommonFSUtils.getWALFileSystem(getConf());
FileStatus[] walFiles = FSUtils.listStatus(fs, walLogDir, null);
if (walFiles == null || walFiles.length == 0) {
LOG.info("HBCK meta recovery WAL directory is empty, removing it now.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testHBaseCluster() throws Exception {

// Now we need to find the log file, its locations, and look at it

String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
String rootDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
"/" + targetRs.getServerName().toString()).toUri().getPath();

DistributedFileSystem mdfs = (DistributedFileSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ public void testDelayedDeleteOnFailure() throws Exception {
startCluster(1);
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
ServerName.valueOf("x", 1, 1).toString());
final Path rootLogDir = new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
fs.mkdirs(logDir);
ExecutorService executor = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testDifferentCounts() throws Exception {
log.rollWriter();

try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
Expand All @@ -183,7 +183,7 @@ public void testAppendsWithRolls() throws Exception {
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
Expand All @@ -197,7 +197,7 @@ public void testAppendsWithRolls() throws Exception {

appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
log, null, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
Expand All @@ -211,7 +211,7 @@ log, null, new MetricsSource("1"))) {
log.rollWriter();
appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
log, null, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
Expand All @@ -237,7 +237,7 @@ public void testLogrollWhileStreaming() throws Exception {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));

appendToLog("3"); // 3 - comes in after reader opened
Expand All @@ -262,7 +262,7 @@ public void testLogrollWhileStreaming() throws Exception {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point

// some new entries come in while we're streaming
Expand All @@ -285,15 +285,15 @@ public void testResumeStreamingFromPosition() throws Exception {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
lastPosition = entryStream.getPosition();
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
Expand All @@ -310,14 +310,14 @@ public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
// read only one element
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
log, null, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
Expand All @@ -328,7 +328,7 @@ log, null, new MetricsSource("1"))) {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testReplicationSourceWALReader() throws Exception {
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
Expand Down Expand Up @@ -476,7 +476,7 @@ public void testReplicationSourceWALReaderDisabled()
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
Expand Down Expand Up @@ -592,7 +592,7 @@ public void testReadBeyondCommittedLength() throws IOException, InterruptedExcep
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0,
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
Expand Down

0 comments on commit d26acbe

Please sign in to comment.