Skip to content

Commit 00f078a

Browse files
authored
HBASE-25972 Dual File Compaction (#5545)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 716adf5 commit 00f078a

26 files changed

+1272
-319
lines changed

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
181181
addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
182182
"Run random seek scan with both start and stop row (max 10000 rows)");
183183
addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
184+
addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
184185
addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
185186
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
187+
addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
188+
"Run sequential delete test");
186189
addCommandDescriptor(MetaWriteTest.class, "metaWrite",
187190
"Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
188191
addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
@@ -352,7 +355,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
352355
boolean needsDelete = false, exists = admin.tableExists(tableName);
353356
boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
354357
|| opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
355-
if (!exists && isReadCmd) {
358+
boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
359+
if (!exists && (isReadCmd || isDeleteCmd)) {
356360
throw new IllegalStateException(
357361
"Must specify an existing table for read commands. Run a write command first.");
358362
}
@@ -367,7 +371,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
367371
&& opts.presplitRegions != admin.getRegions(tableName).size())
368372
|| (!isReadCmd && desc != null
369373
&& !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
370-
|| (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
374+
|| (!(isReadCmd || isDeleteCmd) && desc != null
375+
&& desc.getRegionReplication() != opts.replicas)
371376
|| (desc != null && desc.getColumnFamilyCount() != opts.families)
372377
) {
373378
needsDelete = true;
@@ -2071,6 +2076,18 @@ protected byte[] generateRow(final int i) {
20712076

20722077
}
20732078

2079+
static class RandomDeleteTest extends SequentialDeleteTest {
2080+
RandomDeleteTest(Connection con, TestOptions options, Status status) {
2081+
super(con, options, status);
2082+
}
2083+
2084+
@Override
2085+
protected byte[] generateRow(final int i) {
2086+
return getRandomRow(this.rand, opts.totalRows);
2087+
}
2088+
2089+
}
2090+
20742091
static class ScanTest extends TableTest {
20752092
private ResultScanner testScanner;
20762093

@@ -2406,6 +2423,34 @@ boolean testRow(final int i, final long startTime) throws IOException {
24062423
}
24072424
}
24082425

2426+
static class SequentialDeleteTest extends BufferedMutatorTest {
2427+
2428+
SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2429+
super(con, options, status);
2430+
}
2431+
2432+
protected byte[] generateRow(final int i) {
2433+
return format(i);
2434+
}
2435+
2436+
@Override
2437+
boolean testRow(final int i, final long startTime) throws IOException {
2438+
byte[] row = generateRow(i);
2439+
Delete delete = new Delete(row);
2440+
for (int family = 0; family < opts.families; family++) {
2441+
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2442+
delete.addFamily(familyName);
2443+
}
2444+
delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2445+
if (opts.autoFlush) {
2446+
table.delete(delete);
2447+
} else {
2448+
mutator.mutate(delete);
2449+
}
2450+
return true;
2451+
}
2452+
}
2453+
24092454
/*
24102455
* Insert fake regions into meta table with contiguous split keys.
24112456
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo) throws
537537
Bytes.equals(e.getKey(), HStoreFile.MAJOR_COMPACTION_KEY)
538538
|| Bytes.equals(e.getKey(), HFileInfo.TAGS_COMPRESSED)
539539
|| Bytes.equals(e.getKey(), HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY)
540+
|| Bytes.equals(e.getKey(), HStoreFile.HISTORICAL_KEY)
540541
) {
541542
out.println(Bytes.toBoolean(e.getValue()));
542543
} else if (Bytes.equals(e.getKey(), HFileInfo.LASTKEY)) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private boolean isCompactedFile(FileStatus file, HStore store) {
162162
}
163163

164164
private boolean isActiveStorefile(FileStatus file, HStore store) {
165-
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
165+
return store.getStoreEngine().getStoreFileManager().getStoreFiles().stream()
166166
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
167167
}
168168

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.regionserver;
1919

2020
import java.io.IOException;
21+
import java.util.List;
2122
import org.apache.hadoop.hbase.Cell;
2223
import org.apache.hadoop.hbase.util.BloomFilterWriter;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,4 +35,14 @@ public interface CellSink {
3435
* @param cell the cell to be added
3536
*/
3637
void append(Cell cell) throws IOException;
38+
39+
/**
40+
* Append the given (possibly partial) list of cells of a row
41+
* @param cellList the cell list to be added
42+
*/
43+
default void appendAll(List<Cell> cellList) throws IOException {
44+
for (Cell cell : cellList) {
45+
append(cell);
46+
}
47+
}
3748
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
4343
DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
4444
@Override
4545
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
46-
return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting);
46+
return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting);
4747
}
4848

4949
@Override
@@ -65,14 +65,14 @@ private final class DateTieredCompactionContext extends CompactionContext {
6565

6666
@Override
6767
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
68-
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
68+
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
6969
filesCompacting);
7070
}
7171

7272
@Override
7373
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
7474
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
75-
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
75+
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
7676
isUserCompaction, mayUseOffPeak, forceMajor);
7777
return request != null;
7878
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher, RatioBa
5656

5757
@Override
5858
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
59-
return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
59+
return compactionPolicy.needsCompaction(this.storeFileManager.getStoreFiles(), filesCompacting);
6060
}
6161

6262
@Override
@@ -111,7 +111,7 @@ private class DefaultCompactionContext extends CompactionContext {
111111
@Override
112112
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
113113
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
114-
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
114+
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
115115
isUserCompaction, mayUseOffPeak, forceMajor);
116116
return request != null;
117117
}
@@ -124,7 +124,7 @@ public List<Path> compact(ThroughputController throughputController, User user)
124124

125125
@Override
126126
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
127-
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
127+
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
128128
filesCompacting);
129129
}
130130
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver;
1919

20+
import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles;
21+
22+
import edu.umd.cs.findbugs.annotations.Nullable;
2023
import java.io.IOException;
24+
import java.util.ArrayList;
2125
import java.util.Collection;
2226
import java.util.Comparator;
2327
import java.util.Iterator;
@@ -48,36 +52,71 @@ class DefaultStoreFileManager implements StoreFileManager {
4852
private final CompactionConfiguration comConf;
4953
private final int blockingFileCount;
5054
private final Comparator<HStoreFile> storeFileComparator;
51-
/**
52-
* List of store files inside this store. This is an immutable list that is atomically replaced
53-
* when its contents change.
54-
*/
55-
private volatile ImmutableList<HStoreFile> storefiles = ImmutableList.of();
55+
56+
static class StoreFileList {
57+
/**
58+
* List of store files inside this store. This is an immutable list that is atomically replaced
59+
* when its contents change.
60+
*/
61+
final ImmutableList<HStoreFile> all;
62+
/**
63+
* List of store files that include the latest cells inside this store. This is an immutable
64+
* list that is atomically replaced when its contents change.
65+
*/
66+
@Nullable
67+
final ImmutableList<HStoreFile> live;
68+
69+
StoreFileList(ImmutableList<HStoreFile> storeFiles, ImmutableList<HStoreFile> liveStoreFiles) {
70+
this.all = storeFiles;
71+
this.live = liveStoreFiles;
72+
}
73+
}
74+
75+
private volatile StoreFileList storeFiles;
76+
5677
/**
5778
* List of compacted files inside this store that needs to be excluded in reads because further
5879
* new reads will be using only the newly created files out of compaction. These compacted files
5980
* will be deleted/cleared once all the existing readers on these compacted files are done.
6081
*/
6182
private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of();
83+
private final boolean enableLiveFileTracking;
6284

6385
public DefaultStoreFileManager(CellComparator cellComparator,
6486
Comparator<HStoreFile> storeFileComparator, Configuration conf,
6587
CompactionConfiguration comConf) {
6688
this.cellComparator = cellComparator;
6789
this.storeFileComparator = storeFileComparator;
6890
this.comConf = comConf;
69-
this.blockingFileCount =
91+
blockingFileCount =
7092
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
93+
enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf);
94+
storeFiles =
95+
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
96+
}
97+
98+
private List<HStoreFile> getLiveFiles(Collection<HStoreFile> storeFiles) throws IOException {
99+
List<HStoreFile> liveFiles = new ArrayList<>(storeFiles.size());
100+
for (HStoreFile file : storeFiles) {
101+
file.initReader();
102+
if (!file.isHistorical()) {
103+
liveFiles.add(file);
104+
}
105+
}
106+
return liveFiles;
71107
}
72108

73109
@Override
74-
public void loadFiles(List<HStoreFile> storeFiles) {
75-
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles);
110+
public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
111+
this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles),
112+
enableLiveFileTracking
113+
? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles))
114+
: null);
76115
}
77116

78117
@Override
79-
public final Collection<HStoreFile> getStorefiles() {
80-
return storefiles;
118+
public final Collection<HStoreFile> getStoreFiles() {
119+
return storeFiles.all;
81120
}
82121

83122
@Override
@@ -86,15 +125,20 @@ public Collection<HStoreFile> getCompactedfiles() {
86125
}
87126

88127
@Override
89-
public void insertNewFiles(Collection<HStoreFile> sfs) {
90-
this.storefiles =
91-
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
128+
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
129+
storeFiles = new StoreFileList(
130+
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)),
131+
enableLiveFileTracking
132+
? ImmutableList.sortedCopyOf(storeFileComparator,
133+
Iterables.concat(storeFiles.live, getLiveFiles(sfs)))
134+
: null);
92135
}
93136

94137
@Override
95138
public ImmutableCollection<HStoreFile> clearFiles() {
96-
ImmutableList<HStoreFile> result = storefiles;
97-
storefiles = ImmutableList.of();
139+
ImmutableList<HStoreFile> result = storeFiles.all;
140+
storeFiles =
141+
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
98142
return result;
99143
}
100144

@@ -107,7 +151,7 @@ public Collection<HStoreFile> clearCompactedFiles() {
107151

108152
@Override
109153
public final int getStorefileCount() {
110-
return storefiles.size();
154+
return storeFiles.all.size();
111155
}
112156

113157
@Override
@@ -117,28 +161,38 @@ public final int getCompactedFilesCount() {
117161

118162
@Override
119163
public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
120-
Collection<HStoreFile> results) {
121-
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables
122-
.concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
164+
Collection<HStoreFile> results) throws IOException {
165+
ImmutableList<HStoreFile> liveStoreFiles = null;
166+
if (enableLiveFileTracking) {
167+
liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator,
168+
Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)),
169+
getLiveFiles(results)));
170+
}
171+
storeFiles =
172+
new StoreFileList(
173+
ImmutableList
174+
.sortedCopyOf(storeFileComparator,
175+
Iterables.concat(
176+
Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)),
177+
liveStoreFiles);
123178
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
124179
// Let a background thread close the actual reader on these compacted files and also
125180
// ensure to evict the blocks from block cache so that they are no longer in
126181
// cache
127182
newCompactedfiles.forEach(HStoreFile::markCompactedAway);
128-
this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
129-
Iterables.concat(this.compactedfiles, newCompactedfiles));
183+
compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
184+
Iterables.concat(compactedfiles, newCompactedfiles));
130185
}
131186

132187
@Override
133188
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
134-
this.compactedfiles =
135-
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
136-
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
189+
compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
190+
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
137191
}
138192

139193
@Override
140194
public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) {
141-
return this.storefiles.reverse().iterator();
195+
return storeFiles.all.reverse().iterator();
142196
}
143197

144198
@Override
@@ -153,25 +207,28 @@ public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
153207

154208
@Override
155209
public final Optional<byte[]> getSplitPoint() throws IOException {
156-
return StoreUtils.getSplitPoint(storefiles, cellComparator);
210+
return StoreUtils.getSplitPoint(storeFiles.all, cellComparator);
157211
}
158212

159213
@Override
160-
public final Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
161-
byte[] stopRow, boolean includeStopRow) {
214+
public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
215+
byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) {
216+
if (onlyLatestVersion && enableLiveFileTracking) {
217+
return storeFiles.live;
218+
}
162219
// We cannot provide any useful input and already have the files sorted by seqNum.
163-
return getStorefiles();
220+
return getStoreFiles();
164221
}
165222

166223
@Override
167224
public int getStoreCompactionPriority() {
168-
int priority = blockingFileCount - storefiles.size();
225+
int priority = blockingFileCount - storeFiles.all.size();
169226
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
170227
}
171228

172229
@Override
173230
public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
174-
ImmutableList<HStoreFile> files = storefiles;
231+
ImmutableList<HStoreFile> files = storeFiles.all;
175232
// 1) We can never get rid of the last file which has the maximum seqid.
176233
// 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
177234
return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> {

0 commit comments

Comments
 (0)