Skip to content

HBASE-26969:Eliminate MOB renames when SFT is enabled #4418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,20 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
* smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
mobRefSet.get().clear();
boolean isUserRequest = userRequest.get();
boolean major = request.isAllFiles();
boolean compactMOBs = major && isUserRequest;
boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
MobConstants.DEFAULT_MOB_DISCARD_MISS);
Expand Down Expand Up @@ -350,12 +350,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();

Cell mobCell = null;
try {

mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());

do {
Expand Down Expand Up @@ -435,7 +435,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
mobFileWriter.getPath().getName(), getStoreInfo());
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
Expand Down Expand Up @@ -479,7 +479,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
Expand Down Expand Up @@ -531,7 +531,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
mobFileWriter = newMobWriter(fd, major);
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
Expand Down Expand Up @@ -617,11 +617,16 @@ private void clearThreadLocals() {
}
}

private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOException {
private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
Consumer<Path> writerCreationTracker) throws IOException {
try {
StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true);
StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true)
: mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
major ? majorCompactionCompression : minorCompactionCompression,
store.getRegionInfo().getStartKey(), true, writerCreationTracker);
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
getStoreInfo());
// Add reference we get for compact MOB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
// between a normal and a mob store.
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController,
writerCreationTracker);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
Expand Down Expand Up @@ -171,16 +172,21 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
ThroughputController throughputController) throws IOException {
ThroughputController throughputController, Consumer<Path> writerCreationTracker)
throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobCount = 0;
long mobSize = 0;
long time = snapshot.getTimeRangeTracker().getMax();
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false);
mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false)
: mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
false, writerCreationTracker);
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,15 @@ public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws
maxCreationTimeToArchive, table);
}

FileSystem fs = FileSystem.get(conf);
Set<String> regionNames = new HashSet<>();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
// How safe is this call?
List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf), tableDir);
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);

Set<String> allActiveMobFileName = new HashSet<String>();
FileSystem fs = FileSystem.get(conf);
for (Path regionPath : regionDirs) {
regionNames.add(regionPath.getName());
for (ColumnFamilyDescriptor hcd : list) {
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
Expand Down Expand Up @@ -195,13 +196,26 @@ public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws
for (Path pp : storeFiles) {
currentPath = pp;
LOG.trace("Store file: {}", pp);
HStoreFile sf =
new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
HStoreFile sf = null;
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
try {
sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
} catch (IOException ex) {
// When FileBased SFT is active the store dir can contain corrupted or incomplete
// files. So read errors are expected. We just skip these files.
if (ex instanceof FileNotFoundException) {
throw ex;
}
LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
ex);
continue;
}
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
Expand Down Expand Up @@ -256,9 +270,11 @@ public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
if (!allActiveMobFileName.contains(p.getName())) {
// MOB is not in a list of active references, but it can be too
// fresh, skip it in this case
String[] mobParts = p.getName().split("_");
String regionName = mobParts[mobParts.length - 1];

if (!regionNames.contains(regionName)) {
// MOB belonged to a region no longer hosted
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
Expand All @@ -269,7 +285,7 @@ public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws
fs.getFileStatus(p).getModificationTime());
}
} else {
LOG.trace("Keeping active MOB file: {}", p);
LOG.trace("Keeping MOB file with existing region: {}", p);
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -564,6 +565,33 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family, path, maxKeyCount, compression, cacheConfig,
cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType, isCompaction, null);
}

/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param path The path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param checksumType The checksum type.
* @param bytesPerChecksum The bytes per checksum.
* @param blocksize The HFile block size.
* @param bloomType The bloom filter type.
* @param isCompaction If the writer is used in compaction.
* @param writerCreationTracker to track the current writer in the store
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression,
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction,
Consumer<Path> writerCreationTracker) throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
Expand All @@ -582,7 +610,8 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();

StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path)
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
.withWriterCreationTracker(writerCreationTracker).build();
return w;
}

Expand Down
Loading