Skip to content

HBASE-23222 MOB compaction supportability improvements #763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link BaseHFileCleanerDelegate} that prevents cleaning HFiles from a mob region
*
* keeps a map of table name strings to mob region name strings over the life of
* a JVM instance. if there's churn of unique table names we'll eat memory until
* Master restart.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ManualMobMaintHFileCleaner extends BaseHFileCleanerDelegate {

private static final Logger LOG = LoggerFactory.getLogger(ManualMobMaintHFileCleaner.class);

// We need to avoid making HRegionInfo objects for every table we check.
private static final ConcurrentMap<TableName, String> MOB_REGIONS = new ConcurrentHashMap<>();

@Override
public boolean isFileDeletable(FileStatus fStat) {
try {
// if its a directory, then it can be deleted
if (fStat.isDirectory()) return true;

Path file = fStat.getPath();

// we need the table and region to determine if this is from a mob region
// we don't need to worry about hfilelink back references, because the hfilelink cleaner will retain them.
Path family = file.getParent();
Path region = family.getParent();
Path table = region.getParent();

TableName tableName = FSUtils.getTableName(table);

String mobRegion = MOB_REGIONS.get(tableName);
if (mobRegion == null) {
String tmp = MobUtils.getMobRegionInfo(tableName).getEncodedName();
if (tmp == null) {
LOG.error("couldn't determine mob region for table {} keeping files just in case.", tableName);
return false;
}
mobRegion = MOB_REGIONS.putIfAbsent(tableName, tmp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When are we removing elements from MOB_REGIONS?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the intention here is to retain mob files indefinitely, for as long as this class is set in hbase.master.hfilecleaner.plugins config property? That's my taking from the class javadoc comment:

keeps a map of table name strings to mob region name strings over the life of a JVM instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, we never do. I tried to warn about the consequences for this in the javadoc. Namely how creating lots of short lived tables with unique names will eat memory until a master restart happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound fine @meszibalu?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen the javadoc :( Ok, it sounds good to me.

// a return of null means that tmp is now in the map for future lookups.
if (mobRegion == null) {
mobRegion = tmp;
}
LOG.debug("Had to calculate name of mob region for table {} and it is {}", tableName, mobRegion);
}

boolean ret = !mobRegion.equals(region.getName());
if (LOG.isDebugEnabled() && !ret) {
LOG.debug("Keeping file '{}' because it is from mob dir", fStat.getPath());
}
return ret;
} catch (RuntimeException e) {
LOG.error("Failed to determine mob status of '{}', keeping it just in case.", fStat.getPath(), e);
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -855,11 +855,15 @@ public static void doMobCompaction(Configuration conf, FileSystem fs, TableName
// with major compaction in mob-enabled column.
try {
lock.acquire();
LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " +
"compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
compactor.compact(allFiles);
} catch (Exception e) {
LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
+ " in the table " + tableName.getNameAsString(), e);
} finally {
LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " +
"compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
lock.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
// all the files are selected
request.setCompactionType(CompactionType.ALL_FILES);
}
LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
+ totalDelFiles + " del files, " + selectedFileCount + " selected files, and "
+ irrelevantFileCount + " irrelevant files");
LOG.info("The compaction type is {}, the request has {} del files, {} selected files, and {} " +
"irrelevant files table '{}' and column '{}'", request.getCompactionType(), totalDelFiles,
selectedFileCount, irrelevantFileCount, tableName, column.getNameAsString());
return request;
}

Expand Down Expand Up @@ -347,10 +347,12 @@ protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
totalDelFileCount++;
}
}
LOG.info("After merging, there are " + totalDelFileCount + " del files");
LOG.info("After merging, there are {} del files. table='{}' column='{}'", totalDelFileCount,
tableName, column.getNameAsString());
// compact the mob files by partitions.
paths = compactMobFiles(request);
LOG.info("After compaction, there are " + paths.size() + " mob files");
LOG.info("After compaction, there are {} mob files. table='{}' column='{}'", paths.size(),
tableName, column.getNameAsString());
} finally {
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
closeStoreFileReaders(delPartition.getStoreFiles());
Expand All @@ -359,15 +361,17 @@ protected List<Path> performCompaction(PartitionedMobCompactionRequest request)

// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) {
LOG.info(
"After a mob compaction with all files selected, archiving the del files ");
LOG.info("After a mob compaction with all files selected, archiving the del files for " +
"table='{}' and column='{}'", tableName, column.getNameAsString());
for (CompactionDelPartition delPartition : request.getDelPartitions()) {
LOG.info(Objects.toString(delPartition.listDelFiles()));
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
delPartition.getStoreFiles());
} catch (IOException e) {
LOG.error("Failed to archive the del files " + delPartition.getStoreFiles(), e);
LOG.error("Failed to archive the del files {} for partition {} table='{}' and " +
"column='{}'", delPartition.getStoreFiles(), delPartition.getId(), tableName,
column.getNameAsString(), e);
}
}
}
Expand Down Expand Up @@ -461,7 +465,8 @@ protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest reque
throws IOException {
Collection<CompactionPartition> partitions = request.compactionPartitions;
if (partitions == null || partitions.isEmpty()) {
LOG.info("No partitions of mob files");
LOG.info("No partitions of mob files in table='{}' and column='{}'", tableName,
column.getNameAsString());
return Collections.emptyList();
}
List<Path> paths = new ArrayList<>();
Expand All @@ -483,7 +488,8 @@ protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest reque
results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {
LOG.info("Compacting mob files for partition " + partition.getPartitionId());
LOG.info("Compacting mob files for partition {} for table='{}' and column='{}'",
partition.getPartitionId(), tableName, column.getNameAsString());
return compactMobFilePartition(request, partition, delFiles, c, table);
}
}));
Expand All @@ -495,13 +501,15 @@ public List<Path> call() throws Exception {
paths.addAll(result.getValue().get());
} catch (Exception e) {
// just log the error
LOG.error("Failed to compact the partition " + result.getKey(), e);
LOG.error("Failed to compact the partition {} for table='{}' and column='{}'",
result.getKey(), tableName, column.getNameAsString(), e);
failedPartitions.add(result.getKey());
}
}
if (!failedPartitions.isEmpty()) {
// if any partition fails in the compaction, directly throw an exception.
throw new IOException("Failed to compact the partitions " + failedPartitions);
throw new IOException("Failed to compact the partitions " + failedPartitions +
" for table='" + tableName + "' column='" + column.getNameAsString() + "'");
}
} finally {
try {
Expand Down Expand Up @@ -567,8 +575,9 @@ private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest reque
// move to the next batch.
offset += batch;
}
LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
+ " to " + newFiles.size());
LOG.info("Compaction is finished. The number of mob files is changed from {} to {} for " +
"partition={} for table='{}' and column='{}'", files.size(), newFiles.size(),
partition.getPartitionId(), tableName, column.getNameAsString());
return newFiles;
}

Expand Down Expand Up @@ -675,8 +684,12 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
cleanupTmpMobFile = false;
cleanupCommittedMobFile = true;
// bulkload the ref file
LOG.info("start MOB ref bulkload for partition {} table='{}' column='{}'",
partition.getPartitionId(), tableName, column.getNameAsString());
bulkloadRefFile(table.getName(), bulkloadPathOfPartition, filePath.getName());
cleanupCommittedMobFile = false;
LOG.info("end MOB ref bulkload for partition {} table='{}' column='{}'",
partition.getPartitionId(), tableName, column.getNameAsString());
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
}

Expand All @@ -703,7 +716,11 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
}

if (cleanupCommittedMobFile) {
deletePath(new Path(mobFamilyDir, filePath.getName()));
LOG.error("failed MOB ref bulkload for partition {} table='{}' column='{}'",
partition.getPartitionId(), tableName, column.getNameAsString());
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(),
Collections.singletonList(new HStoreFile(fs, new Path(mobFamilyDir, filePath.getName()),
conf, compactionCacheConfig, BloomType.NONE, true)));
}
}
}
Expand Down Expand Up @@ -901,6 +918,7 @@ private Pair<Long, Long> getFileInfo(List<HStoreFile> storeFiles) throws IOExcep
* @param path The path of the file to be deleted.
*/
private void deletePath(Path path) {
LOG.debug("Cleanup, delete path '{}'", path);
try {
if (path != null) {
fs.delete(path, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mob.ManualMobMaintHFileCleaner;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -96,6 +101,41 @@ public void testTTLCleaner() throws IOException, InterruptedException {
+ " with create time:" + createTime, cleaner.isFileDeletable(fs.getFileStatus(file)));
}

@Test
public void testManualMobCleanerStopsMobRemoval() throws IOException {
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
Path root = UTIL.getDataTestDirOnTestFS();
TableName table = TableName.valueOf("testManualMobCleanerStopsMobRemoval");
Path mob = HFileArchiveUtil.getRegionArchiveDir(root, table, MobUtils.getMobRegionInfo(table).getEncodedName());
Path family= new Path(mob, "family");

Path file = new Path(family, "someHFileThatWouldBeAUUID");
fs.createNewFile(file);
assertTrue("Test file not created!", fs.exists(file));

ManualMobMaintHFileCleaner cleaner = new ManualMobMaintHFileCleaner();

assertFalse("Mob File shouldn't have been deletable. check path. '"+file+"'", cleaner.isFileDeletable(fs.getFileStatus(file)));
}

@Test
public void testManualMobCleanerLetsNonMobGo() throws IOException {
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
Path root = UTIL.getDataTestDirOnTestFS();
TableName table = TableName.valueOf("testManualMobCleanerLetsNonMobGo");
Path nonmob = HFileArchiveUtil.getRegionArchiveDir(root, table, new HRegionInfo(table).getEncodedName());
Path family= new Path(nonmob, "family");

Path file = new Path(family, "someHFileThatWouldBeAUUID");
fs.createNewFile(file);
assertTrue("Test file not created!", fs.exists(file));

ManualMobMaintHFileCleaner cleaner = new ManualMobMaintHFileCleaner();

assertTrue("Non-Mob File should have been deletable. check path. '"+file+"'", cleaner.isFileDeletable(fs.getFileStatus(file)));

}

/**
* @param file to check
* @return loggable information about the file
Expand All @@ -114,7 +154,7 @@ public void testHFileCleaning() throws Exception {
// set TTL
long ttl = 2000;
conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
"org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner");
"org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner,org.apache.hadoop.hbase.mob.ManualMobMaintHFileCleaner");
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
Server server = new DummyServer();
Path archivedHfileDir =
Expand Down
2 changes: 2 additions & 0 deletions hbase-server/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.hadoop.hbase=DEBUG

log4j.logger.org.apache.hadoop.hbase.mob.ManualMobMaintHFileCleaner=DEBUG

#These settings are workarounds against spurious logs from the minicluster.
#See HBASE-4709
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
Expand Down