Skip to content

HBASE-26640 Reimplement master local region initialization to better … #4111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ private void tryMigrateMetaLocationsFromZooKeeper() throws IOException, KeeperEx
// done with in a one row put, which means if we have data in catalog family then we can
// make sure that the migration is done.
LOG.info("The {} family in master local region already has data in it, skip migrating...",
HConstants.CATALOG_FAMILY);
HConstants.CATALOG_FAMILY_STR);
return;
}
}
Expand Down Expand Up @@ -4077,7 +4077,7 @@ public MetaLocationSyncer getMetaLocationSyncer() {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
MasterRegion getMasterRegion() {
public MasterRegion getMasterRegion() {
return masterRegion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,34 @@
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
Expand Down Expand Up @@ -92,6 +99,10 @@ public final class MasterRegion {

private static final String DEAD_WAL_DIR_SUFFIX = "-dead";

static final String INITIALIZING_FLAG = ".initializing";

static final String INITIALIZED_FLAG = ".initialized";

private static final int REGION_ID = 1;

private final WALFactory walFactory;
Expand Down Expand Up @@ -196,32 +207,37 @@ private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRol

private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
MasterRegionWALRoller walRoller, String serverName, boolean touchInitializingFlag)
throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
// persist table descriptor
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, true);
HRegion.createHRegion(conf, regionInfo, fs, tableDir, td).close();
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag: " + initializedFlag);
}
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
if (!fs.delete(initializingFlag, true)) {
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}

private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
FileSystem walFs, Path walRootDir, WALFactory walFactory, MasterRegionWALRoller walRoller,
String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
private static RegionInfo loadRegionInfo(FileSystem fs, Path tableDir) throws IOException {
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
.getPath();
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
return HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
}

private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo regionInfo,
FileSystem fs, Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
Expand Down Expand Up @@ -287,6 +303,39 @@ private static void replayWALs(Configuration conf, FileSystem walFs, Path walRoo
}
}

private static void tryMigrate(Configuration conf, FileSystem fs, Path tableDir,
RegionInfo regionInfo, TableDescriptor oldTd, TableDescriptor newTd) throws IOException {
Class<? extends StoreFileTracker> oldSft =
StoreFileTrackerFactory.getTrackerClass(oldTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
Class<? extends StoreFileTracker> newSft =
StoreFileTrackerFactory.getTrackerClass(newTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
if (oldSft.equals(newSft)) {
LOG.debug("old store file tracker {} is the same with new store file tracker, skip migration",
StoreFileTrackerFactory.getStoreFileTrackerName(oldSft));
if (!oldTd.equals(newTd)) {
// we may change other things such as adding a new family, so here we still need to persist
// the new table descriptor
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
}
return;
}
LOG.info("Migrate store file tracker from {} to {}", oldSft.getSimpleName(),
newSft.getSimpleName());
HRegionFileSystem hfs =
HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false);
for (ColumnFamilyDescriptor oldCfd : oldTd.getColumnFamilies()) {
StoreFileTracker oldTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
StoreFileTracker newTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
List<StoreFileInfo> files = oldTracker.load();
LOG.debug("Store file list for {}: {}", oldCfd.getNameAsString(), files);
newTracker.set(oldTracker.load());
}
// persist the new table descriptor after migration
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
}

public static MasterRegion create(MasterRegionParams params) throws IOException {
TableDescriptor td = params.tableDescriptor();
LOG.info("Create or load local region for table " + td);
Expand Down Expand Up @@ -321,16 +370,58 @@ public static MasterRegion create(MasterRegionParams params) throws IOException

WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
HRegion region;
if (fs.exists(tableDir)) {
// load the existing region.
region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
} else {
// bootstrapping...
if (!fs.exists(tableDir)) {
// bootstrap, no doubt
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag");
}
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
server.getServerName().toString(), true);
} else {
if (!fs.exists(initializedFlag)) {
if (!fs.exists(initializingFlag)) {
// should be old style, where we do not have the initializing or initialized file, persist
// the table descriptor, touch the initialized flag and then open the region.
// the store file tracker must be DEFAULT
LOG.info("No {} or {} file, try upgrading", INITIALIZING_FLAG, INITIALIZED_FLAG);
TableDescriptor oldTd =
TableDescriptorBuilder.newBuilder(td).setValue(StoreFileTrackerFactory.TRACKER_IMPL,
StoreFileTrackerFactory.Trackers.DEFAULT.name()).build();
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, oldTd, true);
if (!fs.mkdirs(initializedFlag)) {
throw new IOException("Can not touch initialized flag: " + initializedFlag);
}
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
} else {
// delete all contents besides the initializing flag, here we can make sure tableDir
// exists(unless someone delete it manually...), so we do not do null check here.
for (FileStatus status : fs.listStatus(tableDir)) {
if (!status.getPath().getName().equals(INITIALIZING_FLAG)) {
fs.delete(status.getPath(), true);
}
}
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString(), false);
}
} else {
if (fs.exists(initializingFlag) && !fs.delete(initializingFlag, true)) {
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
}
// open it, make sure to load the table descriptor from fs
TableDescriptor oldTd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
server.getServerName().toString());
}
}

Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -78,6 +81,8 @@ public final class MasterRegionFactory {

private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;

public static final String TRACKER_IMPL = "hbase.master.store.region.file-tracker.impl";

public static final TableName TABLE_NAME = TableName.valueOf("master:store");

public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
Expand All @@ -89,10 +94,23 @@ public final class MasterRegionFactory {
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();

private static TableDescriptor withTrackerConfigs(Configuration conf) {
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,
StoreFileTrackerFactory.Trackers.DEFAULT.name()));
Class<? extends StoreFileTracker> trackerClass =
StoreFileTrackerFactory.getTrackerClass(trackerImpl);
if (StoreFileTrackerFactory.isMigration(trackerClass)) {
throw new IllegalArgumentException("Should not set store file tracker to " +
StoreFileTrackerFactory.Trackers.MIGRATION.name() + " for master local region");
}
StoreFileTracker tracker = ReflectionUtils.newInstance(trackerClass, conf, true, null);
return tracker.updateWithTrackerConfigs(TableDescriptorBuilder.newBuilder(TABLE_DESC)).build();
}
Comment on lines +97 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could maybe go to one of the SFT related classes, like the SFT Factory or the SFT validations util? I think we had already done some similar validation elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is is a bit different here. In other SFT related classes, we all follow the same pattern to construct a new Configuration instance with TableDescriptor and ColumnFamilyDescriptor, i.e, we need to process per family, not only per table. But here, we know that we do not have SFT related configurations in TableDescriptor and ColumnFamilyDescriptor, we just want to add one to the TableDescriptor, so I copied part of the code here and modified it accordingly.


public static MasterRegion create(Server server) throws IOException {
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
Configuration conf = server.getConfiguration();
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(withTrackerConfigs(conf));
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static String getStoreFileTrackerName(Configuration conf) {
return conf.get(TRACKER_IMPL, Trackers.DEFAULT.name());
}

static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
public static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
Trackers name = CLASS_TO_ENUM.get(clazz);
return name != null ? name.name() : clazz.getName();
}
Expand Down Expand Up @@ -184,4 +184,8 @@ public static TableDescriptor updateWithTrackerConfigs(Configuration conf,
}
return descriptor;
}

public static boolean isMigration(Class<?> clazz) {
return MigrationStoreFileTracker.class.isAssignableFrom(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,13 @@ private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFro
return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
}

/**
* Deletes files matching the table info file pattern within the given directory whose sequenceId
* is at most the given max sequenceId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -87,20 +88,25 @@ public void setUp() throws IOException {
/**
* Creates a new MasterRegion using an existing {@code htu} on this class.
*/
protected void createMasterRegion() throws IOException {
configure(htu.getConfiguration());
protected final void createMasterRegion() throws IOException {
Configuration conf = htu.getConfiguration();
configure(conf);
choreService = new ChoreService(getClass().getSimpleName());
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
when(server.getChoreService()).thenReturn(choreService);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
CommonFSUtils.setRootDir(conf, testDir);
MasterRegionParams params = new MasterRegionParams();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
TableDescriptor td = TableDescriptorBuilder
.newBuilder(TD).setValue(StoreFileTrackerFactory.TRACKER_IMPL, conf
.get(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()))
.build();
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(td)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
Expand Down
Loading