Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1084,27 +1084,44 @@ public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
}

/**
* @param path The pathname of the tmp file into which the store was flushed
* @return store file created.
* Commit the given {@code files}.
* <p/>
* We will move the file into data directory, and open it.
* @param files the files want to commit
* @param validate whether to validate the store files
* @return the committed store files
*/
private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
throws IOException {
// Write-out finished successfully, move into the right spot
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);

status.setStatus("Flushing " + this + ": reopening flushed file");
HStoreFile sf = createStoreFileAndReader(dstPath);

StoreFileReader r = sf.getReader();
this.storeSize.addAndGet(r.length());
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());

if (LOG.isInfoEnabled()) {
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId +
", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
private List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
HRegionFileSystem hfs = getRegionFileSystem();
String familyName = getColumnFamilyName();
for (Path file : files) {
try {
if (validate) {
validateStoreFile(file);
}
Path committedPath = hfs.commitStoreFile(familyName, file);
HStoreFile sf = createStoreFileAndReader(committedPath);
committedFiles.add(sf);
} catch (IOException e) {
LOG.error("Failed to commit store file {}", file, e);
// Try to delete the files we have committed before.
// It is OK to fail when deleting as leaving the file there does not cause any data
// corruption problem. It just introduces some duplicated data which may impact read
// performance a little when reading before compaction.
for (HStoreFile sf : committedFiles) {
Path pathToDelete = sf.getPath();
try {
sf.deleteStoreFile();
} catch (IOException deleteEx) {
LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
deleteEx);
}
}
throw new IOException("Failed to commit the flush", e);
}
}
return sf;
return committedFiles;
}

public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
Expand Down Expand Up @@ -1501,7 +1518,12 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
List<HStoreFile> sfs = commitStoreFiles(newFiles, true);
if (this.getCoprocessorHost() != null) {
for (HStoreFile sf : sfs) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
}
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
Expand Down Expand Up @@ -1542,29 +1564,6 @@ private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOExceptio
}
}

private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
assert sf != null;
sfs.add(sf);
}
return sfs;
}

// Package-visible for tests
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
validateStoreFile(newFile);
// Move the file into the right spot
Path destPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), newFile);
return createStoreFileAndReader(destPath);
}

/**
* Writes the compaction WAL record.
* @param filesCompacted Files compacted (input).
Expand Down Expand Up @@ -2346,42 +2345,31 @@ public boolean commit(MonitoredTask status) throws IOException {
if (CollectionUtils.isEmpty(this.tempFiles)) {
return false;
}
List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
for (Path storeFilePath : tempFiles) {
try {
HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
outputFileSize += sf.getReader().length();
storeFiles.add(sf);
} catch (IOException ex) {
LOG.error("Failed to commit store file {}", storeFilePath, ex);
// Try to delete the files we have committed before.
for (HStoreFile sf : storeFiles) {
Path pathToDelete = sf.getPath();
try {
sf.deleteStoreFile();
} catch (IOException deleteEx) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
+ "halting {}", pathToDelete, ex);
Runtime.getRuntime().halt(1);
}
}
throw new IOException("Failed to commit the flush", ex);
status.setStatus("Flushing " + this + ": reopening flushed file");
List<HStoreFile> storeFiles = commitStoreFiles(tempFiles, false);
for (HStoreFile sf : storeFiles) {
StoreFileReader r = sf.getReader();
if (LOG.isInfoEnabled()) {
LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
}
outputFileSize += r.length();
storeSize.addAndGet(r.length());
totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
committedFiles.add(sf.getPath());
}

flushedCellsCount.addAndGet(cacheFlushCount);
flushedCellsSize.addAndGet(cacheFlushSize);
flushedOutputFileSize.addAndGet(outputFileSize);
// call coprocessor after we have done all the accounting above
for (HStoreFile sf : storeFiles) {
if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
if (getCoprocessorHost() != null) {
getCoprocessorHost().postFlush(HStore.this, sf, tracker);
}
committedFiles.add(sf.getPath());
}

HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);

// Add new file to store files. Clear snapshot too while we have the Store write lock.
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return updateStorefiles(storeFiles, snapshot.getId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -36,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
Expand All @@ -85,8 +88,6 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test compaction framework and common functions
Expand All @@ -98,8 +99,6 @@ public class TestCompaction {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompaction.class);

private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);

@Rule
public TestName name = new TestName();
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
Expand Down Expand Up @@ -346,32 +345,22 @@ public void testCompactionWithCorruptResult() throws Exception {
HStore store = r.getStore(COLUMN_FAMILY);

Collection<HStoreFile> storeFiles = store.getStorefiles();
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
tool.compact(request, NoLimitThroughputController.INSTANCE, null);

// Now lets corrupt the compacted file.
FileSystem fs = store.getFileSystem();
// default compaction policy created one and only one new compacted file
Path dstPath = store.getRegionFileSystem().createTempName();
FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
stream.writeChars("CORRUPT FILE!!!!");
stream.close();
Path origPath = store.getRegionFileSystem().commitStoreFile(
Bytes.toString(COLUMN_FAMILY), dstPath);

try {
((HStore)store).moveFileIntoPlace(origPath);
} catch (Exception e) {
// The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory;
assertTrue(fs.exists(origPath));
assertFalse(fs.exists(dstPath));
LOG.info("testCompactionWithCorruptResult Passed");
return;
}
fail("testCompactionWithCorruptResult failed since no exception was" +
"thrown while completing a corrupt file");
Path tmpPath = store.getRegionFileSystem().createTempName();
try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
stream.writeChars("CORRUPT FILE!!!!");
}
// The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory;
assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
assertTrue(fs.exists(tmpPath));
}

/**
Expand Down