Skip to content

HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes #5121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,13 +57,17 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To

private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);

/**
* Keep locality while generating HFiles for bulkload. See HBASE-12596
*/
public static final String LOCALITY_SENSITIVE_CONF_KEY =
"hbase.bulkload.locality.sensitive.enabled";
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;

public static final String NAME = "completebulkload";
/**
* Whether to run validation on hfiles before loading.
Expand Down Expand Up @@ -540,7 +552,6 @@ private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();

final Callable<Pair<List<LoadQueueItem>, String>> call =
() -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
splittingFutures.add(pool.submit(call));
Expand Down Expand Up @@ -578,8 +589,8 @@ private String getUniqueName() {
return UUID.randomUUID().toString().replaceAll("-", "");
}

private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
byte[] splitKey) throws IOException {
private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
TableDescriptor tableDesc, byte[] splitKey) throws IOException {
Path hfilePath = item.getFilePath();
byte[] family = item.getFamily();
Path tmpDir = hfilePath.getParent();
Expand All @@ -594,7 +605,8 @@ private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor t

Path botOut = new Path(tmpDir, uniqueName + ".bottom");
Path topOut = new Path(tmpDir, uniqueName + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);

splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);

FileSystem fs = tmpDir.getFileSystem(getConf());
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
Expand Down Expand Up @@ -718,8 +730,9 @@ CacheConfig.DISABLED, true, getConf())) {
checkRegionIndexValid(splitIdx, startEndKeys, tableName);
}
byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
List<LoadQueueItem> lqis =
splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);

return new Pair<>(lqis, null);
}

Expand All @@ -729,25 +742,27 @@ CacheConfig.DISABLED, true, getConf())) {
}

/**
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
* filters, etc.
* Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
* recreating bloom filters, etc.
*/
@InterfaceAudience.Private
static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
throws IOException {
// Open reader with no block cache, and not in-memory
Reference topReference = Reference.createTopReference(splitKey);
Reference bottomReference = Reference.createBottomReference(splitKey);

copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
}

/**
* Copy half of an HFile into a new HFile.
* Copy half of an HFile into a new HFile with favored nodes.
*/
private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
throws IOException {
FileSystem fs = inFile.getFileSystem(conf);
CacheConfig cacheConf = CacheConfig.DISABLED;
HalfStoreFileReader halfReader = null;
Expand All @@ -769,12 +784,50 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();

HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();
do {
halfWriter.append(scanner.getCell());
final Cell cell = scanner.getCell();
if (null != halfWriter) {
halfWriter.append(cell);
} else {

// init halfwriter
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
byte[] rowKey = CellUtil.cloneRow(cell);
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
InetSocketAddress[] favoredNodes = null;
if (null == hRegionLocation) {
LOG.warn(
"Failed get region location for rowkey {} , Using writer without favoured nodes.",
Bytes.toString(rowKey));
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
if (initialIsa.isUnresolved()) {
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
hRegionLocation);
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
favoredNodes = new InetSocketAddress[] { initialIsa };
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
}
} else {
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
}
halfWriter.append(cell);
}

} while (scanner.next());

for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -43,10 +44,12 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -63,7 +66,12 @@
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -555,15 +563,48 @@ public void testSplitStoreFile() throws IOException {
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
String tableName = tn.getMethodName();
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);

Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");

BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
BulkLoadHFilesTool.splitStoreFile(
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);

int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
assertEquals(1000, rowCount);
}

/**
* Test hfile splits with the favored nodes
*/
@Test
public void testSplitStoreFileWithFavoriteNodes() throws IOException {

Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
FileSystem fs = util.getDFSCluster().getFileSystem();

Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
String tableName = tn.getMethodName();
Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);

Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");

final AsyncTableRegionLocator regionLocator =
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
verifyHFileFavoriteNode(topOut, regionLocator, fs);
verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
assertEquals(1000, rowCount);
Expand All @@ -575,14 +616,17 @@ public void testSplitStoreFileWithCreateTimeTS() throws IOException {
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
String tableName = tn.getMethodName();
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);

Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");

BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
BulkLoadHFilesTool.splitStoreFile(
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);

verifyHFileCreateTimeTS(bottomOut);
verifyHFileCreateTimeTS(topOut);
Expand Down Expand Up @@ -615,14 +659,17 @@ private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadE
Path testIn = new Path(dir, "testhfile");
ColumnFamilyDescriptor familyDesc =
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
String tableName = tn.getMethodName();
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);

Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");

BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
BulkLoadHFilesTool.splitStoreFile(
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);

int rowCount = verifyHFile(bottomOut);
rowCount += verifyHFile(topOut);
Expand Down Expand Up @@ -654,6 +701,61 @@ private void verifyHFileCreateTimeTS(Path p) throws IOException {
}
}

/**
* test split storefile with favorite node information
*/
private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
throws IOException {
Configuration conf = util.getConfiguration();

try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {

final byte[] firstRowkey = reader.getFirstRowKey().get();
final HRegionLocation hRegionLocation =
FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));

final String targetHostName = hRegionLocation.getHostname();

if (fs instanceof DistributedFileSystem) {
String pathStr = p.toUri().getPath();
LocatedBlocks blocks =
((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);

boolean isFavoriteNode = false;
List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
int index = 0;
do {
if (index > 0) {
assertTrue("failed use favored nodes", isFavoriteNode);
}
isFavoriteNode = false;
final LocatedBlock block = locatedBlocks.get(index);

final DatanodeInfo[] locations = block.getLocations();
for (DatanodeInfo location : locations) {

final String hostName = location.getHostName();
if (
targetHostName.equals(hostName.equals("127.0.0.1")
? InetAddress.getLocalHost().getHostName()
: "127.0.0.1") || targetHostName.equals(hostName)
) {
isFavoriteNode = true;
break;
}
}

index++;
} while (index < locatedBlocks.size());
if (index > 0) {
assertTrue("failed use favored nodes", isFavoriteNode);
}

}

}
}

private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
Integer value = map.containsKey(first) ? map.get(first) : 0;
map.put(first, value + 1);
Expand Down