Skip to content

Commit 8c003fb

Browse files
committed
HBASE-26938 Compaction failures after StoreFileTracker integration
Compactions might be concurrent against a given store and the Compactor is shared among them. Do not put mutable state into shared class fields. All Compactor class fields should be final or effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
1 parent 242a194 commit 8c003fb

File tree

13 files changed

+138
-121
lines changed

13 files changed

+138
-121
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.hbase.KeyValue;
3939
import org.apache.hadoop.hbase.PrivateCellUtil;
4040
import org.apache.hadoop.hbase.TableName;
41+
import org.apache.hadoop.hbase.regionserver.CellSink;
4142
import org.apache.hadoop.hbase.regionserver.HMobStore;
4243
import org.apache.hadoop.hbase.regionserver.HStore;
4344
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -51,6 +52,7 @@
5152
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
5253
import org.apache.hadoop.hbase.regionserver.StoreScanner;
5354
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
55+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
5456
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
5557
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
5658
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -61,7 +63,6 @@
6163
import org.apache.yetus.audience.InterfaceAudience;
6264
import org.slf4j.Logger;
6365
import org.slf4j.LoggerFactory;
64-
6566
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
6667
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
6768
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -285,17 +286,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
285286
* </ol>
286287
* @param fd File details
287288
* @param scanner Where to read from.
289+
* @param writer Where to write to.
288290
* @param smallestReadPoint Smallest read point.
289291
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
290292
* @param throughputController The compaction throughput controller.
291293
* @param major Is a major compaction.
292294
* @param numofFilesToCompact the number of files to compact
295+
* @param progress Progress reporter.
293296
* @return Whether compaction ended; false if it was interrupted for any reason.
294297
*/
295298
@Override
296-
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
299+
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
297300
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
298-
boolean major, int numofFilesToCompact) throws IOException {
301+
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
299302
long bytesWrittenProgressForLog = 0;
300303
long bytesWrittenProgressForShippedCall = 0;
301304
// Clear old mob references
@@ -661,9 +664,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
661664
}
662665
}
663666

664-
665667
@Override
666-
protected List<Path> commitWriter(FileDetails fd,
668+
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
667669
CompactionRequestImpl request) throws IOException {
668670
List<Path> newFiles = Lists.newArrayList(writer.getPath());
669671
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
156156
// rows that has cells from both memstore and files (or only files)
157157
private LongAdder mixedRowReadsCount = new LongAdder();
158158

159-
private boolean cacheOnWriteLogged;
160-
161159
/**
162160
* Lock specific to archiving compacted store files. This avoids races around
163161
* the combination of retrieving the list of compacted files and moving them to
@@ -290,7 +288,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
290288
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
291289
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
292290
family.getCompressionType());
293-
cacheOnWriteLogged = false;
294291
}
295292

296293
private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
@@ -1157,23 +1154,28 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
11571154
}
11581155
replaceStoreFiles(filesToCompact, sfs, true);
11591156

1160-
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
1161-
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
1162-
// have failed.
1163-
storeEngine.resetCompactionWriter();
1164-
1165-
if (cr.isMajor()) {
1166-
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
1167-
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
1168-
} else {
1169-
compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
1170-
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
1157+
// Compaction progress for the request will be removed after completeCompaction so be sure
1158+
// this code runs before you call completeCompaction.
1159+
CompactionProgress progress = storeEngine.getCompactor().getProgress(cr);
1160+
if (progress != null) {
1161+
if (cr.isMajor()) {
1162+
majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
1163+
majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
1164+
} else {
1165+
compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
1166+
compactedCellsSize.addAndGet(progress.totalCompactedSize);
1167+
}
11711168
}
11721169
long outputBytes = getTotalSize(sfs);
11731170

11741171
// At this point the store will use new files for all new scanners.
11751172
refreshStoreSizeAndTotalBytes(); // update store size.
11761173

1174+
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
1175+
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
1176+
// has failed.
1177+
storeEngine.completeCompaction(cr);
1178+
11771179
long now = EnvironmentEdgeManager.currentTime();
11781180
if (region.getRegionServerServices() != null
11791181
&& region.getRegionServerServices().getMetrics() != null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
*/
9595
@InterfaceAudience.Private
9696
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
97-
C extends Compactor, SFM extends StoreFileManager> {
97+
C extends Compactor<?>, SFM extends StoreFileManager> {
9898

9999
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
100100

@@ -157,7 +157,7 @@ public CompactionPolicy getCompactionPolicy() {
157157
/**
158158
* @return Compactor to use.
159159
*/
160-
public Compactor getCompactor() {
160+
public Compactor<?> getCompactor() {
161161
return this.compactor;
162162
}
163163

@@ -545,14 +545,15 @@ public boolean requireWritingToTmpDirFirst() {
545545
}
546546

547547
/**
548-
* Resets the compaction writer when the new file is committed and used as active storefile.
549-
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
550-
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
551-
* have failed. Currently called in
548+
* Completes the compaction, cleaning up resources, once the new file is committed and used as
549+
* active storefile. This step is necessary for the correctness of BrokenStoreFileCleanerChore.
550+
* It lets the CleanerChore know that compaction is done and the file can be cleaned up if
551+
* compaction has failed. Currently called in
552+
* @param request the compaction request
552553
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
553554
*/
554-
public void resetCompactionWriter(){
555-
compactor.resetWriter();
555+
public void completeCompaction(CompactionRequestImpl request) {
556+
compactor.completeCompaction(request);
556557
}
557558

558559
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
6868
}
6969

7070
@Override
71-
protected void abortWriter() throws IOException {
71+
protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
7272
FileSystem fs = store.getFileSystem();
7373
for (Path leftoverFile : writer.abortWriters()) {
7474
try {
@@ -79,7 +79,6 @@ protected void abortWriter() throws IOException {
7979
e);
8080
}
8181
}
82-
//this step signals that the target file is no longer writen and can be cleaned up
83-
writer = null;
8482
}
83+
8584
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class CompactionProgress {
3737
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);
3838

3939
/** the total compacting key values in currently running compaction */
40-
private long totalCompactingKVs;
40+
public long totalCompactingKVs;
4141
/** the completed count of key values in currently running compaction */
4242
public long currentCompactedKVs = 0;
4343
/** the total size of data processed by the currently running compaction, in bytes */

0 commit comments

Comments
 (0)