Skip to content

Commit cb35f18

Browse files
wchevreuilApache9
authored andcommitted
HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent dd7949f commit cb35f18

File tree

12 files changed

+473
-42
lines changed

12 files changed

+473
-42
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Collections;
2525
import java.util.List;
2626
import java.util.stream.Stream;
27+
28+
import org.apache.hadoop.conf.Configuration;
2729
import org.apache.hadoop.fs.FileSystem;
2830
import org.apache.hadoop.fs.Path;
2931
import org.apache.hadoop.hbase.MetaMutationAnnotation;
@@ -53,6 +55,8 @@
5355
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
5456
import org.apache.hadoop.hbase.regionserver.HStoreFile;
5557
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
58+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
59+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
5660
import org.apache.hadoop.hbase.util.Bytes;
5761
import org.apache.hadoop.hbase.util.CommonFSUtils;
5862
import org.apache.hadoop.hbase.wal.WALSplitUtil;
@@ -584,40 +588,47 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
584588
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
585589
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
586590
final FileSystem fs = mfs.getFileSystem();
587-
591+
List<Path> mergedFiles = new ArrayList<>();
588592
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
589593
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
590594

591595
for (RegionInfo ri: this.regionsToMerge) {
592596
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
593597
env.getMasterConfiguration(), fs, tableDir, ri, false);
594-
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
598+
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
595599
}
596600
assert mergeRegionFs != null;
597-
mergeRegionFs.commitMergedRegion();
601+
mergeRegionFs.commitMergedRegion(mergedFiles, env);
598602

599603
// Prepare to create merged regions
600604
env.getAssignmentManager().getRegionStates().
601605
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
602606
}
603607

604-
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
608+
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
605609
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
606610
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
607611
.get(mergedRegion.getTable());
612+
List<Path> mergedFiles = new ArrayList<>();
608613
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
609614
String family = hcd.getNameAsString();
610-
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
615+
Configuration trackerConfig =
616+
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
617+
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
618+
family, regionFs);
619+
final Collection<StoreFileInfo> storeFiles = tracker.load();
611620
if (storeFiles != null && storeFiles.size() > 0) {
612621
for (StoreFileInfo storeFileInfo : storeFiles) {
613622
// Create reference file(s) to parent region file here in mergedDir.
614623
// As this procedure is running on master, use CacheConfig.DISABLED means
615624
// don't cache any block.
616-
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
625+
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
617626
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
627+
mergedFiles.add(refFile);
618628
}
619629
}
620630
}
631+
return mergedFiles;
621632
}
622633

623634
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
6565
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
6666
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
67+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
68+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
6769
import org.apache.hadoop.hbase.util.Bytes;
6870
import org.apache.hadoop.hbase.util.CommonFSUtils;
6971
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -618,21 +620,20 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
618620
final FileSystem fs = mfs.getFileSystem();
619621
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
620622
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
621-
622623
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
623624

624-
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
625+
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);
625626

626-
assertReferenceFileCount(fs, expectedReferences.getFirst(),
627+
assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
627628
regionFs.getSplitsDir(daughterOneRI));
628-
regionFs.commitDaughterRegion(daughterOneRI);
629-
assertReferenceFileCount(fs, expectedReferences.getFirst(),
629+
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
630+
assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
630631
new Path(tabledir, daughterOneRI.getEncodedName()));
631632

632-
assertReferenceFileCount(fs, expectedReferences.getSecond(),
633+
assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
633634
regionFs.getSplitsDir(daughterTwoRI));
634-
regionFs.commitDaughterRegion(daughterTwoRI);
635-
assertReferenceFileCount(fs, expectedReferences.getSecond(),
635+
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
636+
assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
636637
new Path(tabledir, daughterTwoRI.getEncodedName()));
637638
}
638639

@@ -649,7 +650,7 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
649650
* Create Split directory
650651
* @param env MasterProcedureEnv
651652
*/
652-
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
653+
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
653654
final HRegionFileSystem regionFs) throws IOException {
654655
final Configuration conf = env.getMasterConfiguration();
655656
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
@@ -665,7 +666,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
665666
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
666667
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
667668
String family = cfd.getNameAsString();
668-
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
669+
Configuration trackerConfig = StoreFileTrackerFactory.
670+
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
671+
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
672+
family, regionFs);
673+
Collection<StoreFileInfo> sfis = tracker.load();
669674
if (sfis == null) {
670675
continue;
671676
}
@@ -691,7 +696,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
691696
}
692697
if (nbFiles == 0) {
693698
// no file needs to be splitted.
694-
return new Pair<Integer, Integer>(0, 0);
699+
return new Pair<>(Collections.emptyList(), Collections.emptyList());
695700
}
696701
// Max #threads is the smaller of the number of storefiles or the default max determined above.
697702
int maxThreads = Math.min(
@@ -744,14 +749,18 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
744749
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
745750
}
746751

747-
int daughterA = 0;
748-
int daughterB = 0;
752+
List<Path> daughterA = new ArrayList<>();
753+
List<Path> daughterB = new ArrayList<>();
749754
// Look for any exception
750755
for (Future<Pair<Path, Path>> future : futures) {
751756
try {
752757
Pair<Path, Path> p = future.get();
753-
daughterA += p.getFirst() != null ? 1 : 0;
754-
daughterB += p.getSecond() != null ? 1 : 0;
758+
if(p.getFirst() != null){
759+
daughterA.add(p.getFirst());
760+
}
761+
if(p.getSecond() != null){
762+
daughterB.add(p.getSecond());
763+
}
755764
} catch (InterruptedException e) {
756765
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
757766
} catch (ExecutionException e) {
@@ -764,7 +773,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
764773
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
765774
" storefiles, Daughter B: " + daughterB + " storefiles.");
766775
}
767-
return new Pair<Integer, Integer>(daughterA, daughterB);
776+
return new Pair<>(daughterA, daughterB);
768777
}
769778

770779
private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.io.InterruptedIOException;
2424
import java.util.ArrayList;
2525
import java.util.Collection;
26+
import java.util.HashMap;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.Optional;
2931
import java.util.UUID;
@@ -45,6 +47,9 @@
4547
import org.apache.hadoop.hbase.client.TableDescriptor;
4648
import org.apache.hadoop.hbase.fs.HFileSystem;
4749
import org.apache.hadoop.hbase.io.Reference;
50+
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
51+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
52+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
4853
import org.apache.hadoop.hbase.util.Bytes;
4954
import org.apache.hadoop.hbase.util.CommonFSUtils;
5055
import org.apache.hadoop.hbase.util.FSUtils;
@@ -592,19 +597,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
592597
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
593598
* @throws IOException
594599
*/
595-
public Path commitDaughterRegion(final RegionInfo regionInfo)
596-
throws IOException {
600+
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
601+
MasterProcedureEnv env) throws IOException {
597602
Path regionDir = this.getSplitsDir(regionInfo);
598603
if (fs.exists(regionDir)) {
599604
// Write HRI to a file in case we need to recover hbase:meta
600605
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
601606
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
602607
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
608+
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
609+
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
610+
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
603611
}
604-
605612
return regionDir;
606613
}
607614

615+
private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
616+
HRegionFileSystem regionFs) throws IOException {
617+
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
618+
get(regionInfo.getTable());
619+
//we need to map trackers per store
620+
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
621+
//we need to map store files per store
622+
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
623+
for(Path file : allFiles) {
624+
String familyName = file.getParent().getName();
625+
trackerMap.computeIfAbsent(familyName, t -> {
626+
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
627+
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
628+
return StoreFileTrackerFactory.
629+
create(config, true, familyName, regionFs);
630+
});
631+
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
632+
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
633+
infos.add(new StoreFileInfo(conf, fs, file, true));
634+
}
635+
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
636+
entry.getValue().add(fileInfoMap.get(entry.getKey()));
637+
}
638+
}
639+
608640
/**
609641
* Creates region split daughter directories under the table dir. If the daughter regions already
610642
* exist, for example, in the case of a recovery from a previous failed split procedure, this
@@ -755,13 +787,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi
755787
* Commit a merged region, making it ready for use.
756788
* @throws IOException
757789
*/
758-
public void commitMergedRegion() throws IOException {
790+
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
791+
throws IOException {
759792
Path regionDir = getMergesDir(regionInfoForFs);
760793
if (regionDir != null && fs.exists(regionDir)) {
761794
// Write HRI to a file in case we need to recover hbase:meta
762795
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
763796
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
764797
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
798+
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
765799
}
766800
}
767801

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.List;
2323
import org.apache.hadoop.conf.Configuration;
24+
2425
import org.apache.hadoop.hbase.regionserver.StoreContext;
2526
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
2627
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,8 +33,7 @@
3233
@InterfaceAudience.Private
3334
class DefaultStoreFileTracker extends StoreFileTrackerBase {
3435

35-
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
36-
StoreContext ctx) {
36+
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
3737
super(conf, isPrimaryReplica, ctx);
3838
}
3939

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
*/
4949
@InterfaceAudience.Private
5050
public interface StoreFileTracker {
51-
5251
/**
5352
* Load the store files list when opening a region.
5453
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,51 @@
1818
package org.apache.hadoop.hbase.regionserver.storefiletracker;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hbase.CompoundConfiguration;
22+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
23+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
24+
import org.apache.hadoop.hbase.client.TableDescriptor;
25+
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
2126
import org.apache.hadoop.hbase.regionserver.StoreContext;
27+
import org.apache.hadoop.hbase.util.Bytes;
2228
import org.apache.hadoop.hbase.util.ReflectionUtils;
2329
import org.apache.yetus.audience.InterfaceAudience;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2432

2533
/**
2634
* Factory method for creating store file tracker.
2735
*/
2836
@InterfaceAudience.Private
2937
public final class StoreFileTrackerFactory {
30-
3138
public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
39+
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);
3240

3341
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
34-
StoreContext ctx) {
42+
StoreContext ctx) {
3543
Class<? extends StoreFileTracker> tracker =
3644
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
45+
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
3746
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
3847
}
48+
49+
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
50+
HRegionFileSystem regionFs) {
51+
ColumnFamilyDescriptorBuilder fDescBuilder =
52+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
53+
StoreContext ctx = StoreContext.getBuilder().
54+
withColumnFamilyDescriptor(fDescBuilder.build()).
55+
withRegionFileSystem(regionFs).
56+
build();
57+
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
58+
}
59+
60+
public static Configuration mergeConfigurations(Configuration global,
61+
TableDescriptor table, ColumnFamilyDescriptor family) {
62+
return new CompoundConfiguration()
63+
.add(global)
64+
.addBytesMap(table.getValues())
65+
.addStringMap(family.getConfiguration())
66+
.addBytesMap(family.getValues());
67+
}
3968
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public void testCustomParts() throws Exception {
6767
DummyStoreFlusher.class.getName());
6868
HRegion mockRegion = Mockito.mock(HRegion.class);
6969
HStore mockStore = Mockito.mock(HStore.class);
70+
mockStore.conf = conf;
7071
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
7172
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
7273
StoreEngine<?, ?, ?, ?> se =

0 commit comments

Comments
 (0)