Skip to content

Commit e89afdf

Browse files
wchevreuilApache9
authored andcommitted
HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 826e54c commit e89afdf

File tree

12 files changed

+476
-44
lines changed

12 files changed

+476
-44
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Collections;
2525
import java.util.List;
2626
import java.util.stream.Stream;
27-
2827
import org.apache.hadoop.conf.Configuration;
2928
import org.apache.hadoop.fs.FileSystem;
3029
import org.apache.hadoop.fs.Path;
@@ -57,6 +56,8 @@
5756
import org.apache.hadoop.hbase.regionserver.HStoreFile;
5857
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
5958
import org.apache.hadoop.hbase.regionserver.StoreUtils;
59+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
60+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
6061
import org.apache.hadoop.hbase.util.Bytes;
6162
import org.apache.hadoop.hbase.util.CommonFSUtils;
6263
import org.apache.hadoop.hbase.wal.WALSplitUtil;
@@ -588,30 +589,35 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
588589
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
589590
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
590591
final FileSystem fs = mfs.getFileSystem();
591-
592+
List<Path> mergedFiles = new ArrayList<>();
592593
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
593594
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
594595

595596
for (RegionInfo ri: this.regionsToMerge) {
596597
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
597598
env.getMasterConfiguration(), fs, tableDir, ri, false);
598-
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
599+
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
599600
}
600601
assert mergeRegionFs != null;
601-
mergeRegionFs.commitMergedRegion();
602+
mergeRegionFs.commitMergedRegion(mergedFiles, env);
602603

603604
// Prepare to create merged regions
604605
env.getAssignmentManager().getRegionStates().
605606
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
606607
}
607608

608-
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
609+
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
609610
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
610611
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
611612
.get(mergedRegion.getTable());
613+
List<Path> mergedFiles = new ArrayList<>();
612614
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
613615
String family = hcd.getNameAsString();
614-
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
616+
Configuration trackerConfig =
617+
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
618+
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
619+
family, regionFs);
620+
final Collection<StoreFileInfo> storeFiles = tracker.load();
615621
if (storeFiles != null && storeFiles.size() > 0) {
616622
final Configuration storeConfiguration =
617623
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
@@ -623,11 +629,13 @@ private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
623629
// is running in a regionserver's Store context, or we might not be able
624630
// to read the hfiles.
625631
storeFileInfo.setConf(storeConfiguration);
626-
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
632+
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
627633
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
634+
mergedFiles.add(refFile);
628635
}
629636
}
630637
}
638+
return mergedFiles;
631639
}
632640

633641
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.Future;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.stream.Stream;
36-
3736
import org.apache.hadoop.conf.Configuration;
3837
import org.apache.hadoop.fs.FileSystem;
3938
import org.apache.hadoop.fs.Path;
@@ -66,6 +65,8 @@
6665
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
6766
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
6867
import org.apache.hadoop.hbase.regionserver.StoreUtils;
68+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
69+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
6970
import org.apache.hadoop.hbase.util.Bytes;
7071
import org.apache.hadoop.hbase.util.CommonFSUtils;
7172
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -621,21 +622,20 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
621622
final FileSystem fs = mfs.getFileSystem();
622623
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
623624
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
624-
625625
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
626626

627-
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
627+
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);
628628

629-
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
629+
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
630630
regionFs.getSplitsDir(daughterOneRI));
631-
regionFs.commitDaughterRegion(daughterOneRI);
632-
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
631+
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
632+
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
633633
new Path(tabledir, daughterOneRI.getEncodedName()));
634634

635-
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
635+
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
636636
regionFs.getSplitsDir(daughterTwoRI));
637-
regionFs.commitDaughterRegion(daughterTwoRI);
638-
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
637+
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
638+
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
639639
new Path(tabledir, daughterTwoRI.getEncodedName()));
640640
}
641641

@@ -652,7 +652,7 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
652652
* Create Split directory
653653
* @param env MasterProcedureEnv
654654
*/
655-
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
655+
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
656656
final HRegionFileSystem regionFs) throws IOException {
657657
final Configuration conf = env.getMasterConfiguration();
658658
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
@@ -668,7 +668,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
668668
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
669669
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
670670
String family = cfd.getNameAsString();
671-
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
671+
Configuration trackerConfig = StoreFileTrackerFactory.
672+
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
673+
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
674+
family, regionFs);
675+
Collection<StoreFileInfo> sfis = tracker.load();
672676
if (sfis == null) {
673677
continue;
674678
}
@@ -694,7 +698,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
694698
}
695699
if (nbFiles == 0) {
696700
// no file needs to be splitted.
697-
return new Pair<Integer, Integer>(0, 0);
701+
return new Pair<>(Collections.emptyList(), Collections.emptyList());
698702
}
699703
// Max #threads is the smaller of the number of storefiles or the default max determined above.
700704
int maxThreads = Math.min(
@@ -752,14 +756,18 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
752756
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
753757
}
754758

755-
int daughterA = 0;
756-
int daughterB = 0;
759+
List<Path> daughterA = new ArrayList<>();
760+
List<Path> daughterB = new ArrayList<>();
757761
// Look for any exception
758762
for (Future<Pair<Path, Path>> future : futures) {
759763
try {
760764
Pair<Path, Path> p = future.get();
761-
daughterA += p.getFirst() != null ? 1 : 0;
762-
daughterB += p.getSecond() != null ? 1 : 0;
765+
if(p.getFirst() != null){
766+
daughterA.add(p.getFirst());
767+
}
768+
if(p.getSecond() != null){
769+
daughterB.add(p.getSecond());
770+
}
763771
} catch (InterruptedException e) {
764772
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
765773
} catch (ExecutionException e) {
@@ -772,7 +780,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
772780
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
773781
" storefiles, Daughter B: " + daughterB + " storefiles.");
774782
}
775-
return new Pair<Integer, Integer>(daughterA, daughterB);
783+
return new Pair<>(daughterA, daughterB);
776784
}
777785

778786
private void assertSplitResultFilesCount(final FileSystem fs,

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.io.InterruptedIOException;
2525
import java.util.ArrayList;
2626
import java.util.Collection;
27+
import java.util.HashMap;
2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Objects;
2931
import java.util.Optional;
3032
import java.util.UUID;
@@ -49,6 +51,9 @@
4951
import org.apache.hadoop.hbase.fs.HFileSystem;
5052
import org.apache.hadoop.hbase.io.HFileLink;
5153
import org.apache.hadoop.hbase.io.Reference;
54+
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
55+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
56+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
5257
import org.apache.hadoop.hbase.util.Bytes;
5358
import org.apache.hadoop.hbase.util.CommonFSUtils;
5459
import org.apache.hadoop.hbase.util.FSUtils;
@@ -595,19 +600,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
595600
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
596601
* @throws IOException
597602
*/
598-
public Path commitDaughterRegion(final RegionInfo regionInfo)
599-
throws IOException {
603+
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
604+
MasterProcedureEnv env) throws IOException {
600605
Path regionDir = this.getSplitsDir(regionInfo);
601606
if (fs.exists(regionDir)) {
602607
// Write HRI to a file in case we need to recover hbase:meta
603608
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
604609
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
605610
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
611+
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
612+
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
613+
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
606614
}
607-
608615
return regionDir;
609616
}
610617

618+
private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
619+
HRegionFileSystem regionFs) throws IOException {
620+
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
621+
get(regionInfo.getTable());
622+
//we need to map trackers per store
623+
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
624+
//we need to map store files per store
625+
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
626+
for(Path file : allFiles) {
627+
String familyName = file.getParent().getName();
628+
trackerMap.computeIfAbsent(familyName, t -> {
629+
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
630+
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
631+
return StoreFileTrackerFactory.
632+
create(config, true, familyName, regionFs);
633+
});
634+
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
635+
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
636+
infos.add(new StoreFileInfo(conf, fs, file, true));
637+
}
638+
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
639+
entry.getValue().add(fileInfoMap.get(entry.getKey()));
640+
}
641+
}
642+
611643
/**
612644
* Creates region split daughter directories under the table dir. If the daughter regions already
613645
* exist, for example, in the case of a recovery from a previous failed split procedure, this
@@ -795,13 +827,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi
795827
* Commit a merged region, making it ready for use.
796828
* @throws IOException
797829
*/
798-
public void commitMergedRegion() throws IOException {
830+
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
831+
throws IOException {
799832
Path regionDir = getMergesDir(regionInfoForFs);
800833
if (regionDir != null && fs.exists(regionDir)) {
801834
// Write HRI to a file in case we need to recover hbase:meta
802835
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
803836
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
804837
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
838+
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
805839
}
806840
}
807841

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.List;
2323
import org.apache.hadoop.conf.Configuration;
24+
2425
import org.apache.hadoop.hbase.regionserver.StoreContext;
2526
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
2627
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,8 +33,7 @@
3233
@InterfaceAudience.Private
3334
class DefaultStoreFileTracker extends StoreFileTrackerBase {
3435

35-
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
36-
StoreContext ctx) {
36+
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
3737
super(conf, isPrimaryReplica, ctx);
3838
}
3939

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
*/
4949
@InterfaceAudience.Private
5050
public interface StoreFileTracker {
51-
5251
/**
5352
* Load the store files list when opening a region.
5453
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,51 @@
1818
package org.apache.hadoop.hbase.regionserver.storefiletracker;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hbase.CompoundConfiguration;
22+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
23+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
24+
import org.apache.hadoop.hbase.client.TableDescriptor;
25+
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
2126
import org.apache.hadoop.hbase.regionserver.StoreContext;
27+
import org.apache.hadoop.hbase.util.Bytes;
2228
import org.apache.hadoop.hbase.util.ReflectionUtils;
2329
import org.apache.yetus.audience.InterfaceAudience;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2432

2533
/**
2634
* Factory method for creating store file tracker.
2735
*/
2836
@InterfaceAudience.Private
2937
public final class StoreFileTrackerFactory {
30-
3138
public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
39+
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);
3240

3341
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
34-
StoreContext ctx) {
42+
StoreContext ctx) {
3543
Class<? extends StoreFileTracker> tracker =
3644
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
45+
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
3746
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
3847
}
48+
49+
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
50+
HRegionFileSystem regionFs) {
51+
ColumnFamilyDescriptorBuilder fDescBuilder =
52+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
53+
StoreContext ctx = StoreContext.getBuilder().
54+
withColumnFamilyDescriptor(fDescBuilder.build()).
55+
withRegionFileSystem(regionFs).
56+
build();
57+
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
58+
}
59+
60+
public static Configuration mergeConfigurations(Configuration global,
61+
TableDescriptor table, ColumnFamilyDescriptor family) {
62+
return new CompoundConfiguration()
63+
.add(global)
64+
.addBytesMap(table.getValues())
65+
.addStringMap(family.getConfiguration())
66+
.addBytesMap(family.getValues());
67+
}
3968
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public void testCustomParts() throws Exception {
6767
DummyStoreFlusher.class.getName());
6868
HRegion mockRegion = Mockito.mock(HRegion.class);
6969
HStore mockStore = Mockito.mock(HStore.class);
70+
mockStore.conf = conf;
7071
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
7172
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
7273
StoreEngine<?, ?, ?, ?> se =

0 commit comments

Comments
 (0)