Skip to content

HBASE-29382 The always.copy.files parameter does not take effect in some bulkload scenarios #7081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: branch-2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class SecureBulkLoadManager {
private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
private Connection conn;

static final String TMP_DIR = ".tmp";

SecureBulkLoadManager(Configuration conf, Connection conn) {
this.conf = conf;
this.conn = conn;
Expand Down Expand Up @@ -390,7 +392,7 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than "
+ "the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else if (copyFile) {
} else if (copyFile && !p.getParent().getName().equals(TMP_DIR)) {
LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> qu
try {
pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups =
groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
groupOrSplitPhase(table, pool, queue, startEndKeys, copyFile).getFirst();
bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
} finally {
if (pool != null) {
Expand Down Expand Up @@ -451,7 +451,7 @@ private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
count++;

// Using ByteBuffer for byte[] equality semantics
pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
pair = groupOrSplitPhase(table, pool, queue, startEndKeys, copyFile);
Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();

if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
Expand Down Expand Up @@ -630,7 +630,7 @@ protected byte[] rpcCall() throws Exception {
*/
private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
Expand All @@ -649,7 +649,7 @@ private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase
@Override
public Pair<List<LoadQueueItem>, String> call() throws Exception {
Pair<List<LoadQueueItem>, String> splits =
groupOrSplit(regionGroups, item, table, startEndKeys);
groupOrSplit(regionGroups, item, table, startEndKeys, copyFile);
return splits;
}
};
Expand Down Expand Up @@ -684,7 +684,7 @@ public Pair<List<LoadQueueItem>, String> call() throws Exception {
}

private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
byte[] startKey, byte[] splitKey) throws IOException {
byte[] startKey, byte[] splitKey, boolean copyFile) throws IOException {
Path hfilePath = item.getFilePath();
byte[] family = item.getFamily();
Path tmpDir = hfilePath.getParent();
Expand Down Expand Up @@ -716,7 +716,10 @@ private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table
// we don't need it anymore. Clean up to save space.
// It is not part of the original input files.
try {
if (tmpDir.getName().equals(TMP_DIR)) {
if (copyFile && !hfilePath.getParent().getName().equals(TMP_DIR)) {
// do nothing
// Retain the hfiles before splitting.
} else {
fs.delete(hfilePath, false);
}
} catch (IOException e) {
Expand Down Expand Up @@ -779,7 +782,7 @@ private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]> start
@InterfaceAudience.Private
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
Path hfilePath = item.getFilePath();
Optional<byte[]> first, last;
try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
Expand Down Expand Up @@ -823,7 +826,7 @@ CacheConfig.DISABLED, true, getConf())) {
checkRegionIndexValid(splitIdx, startEndKeys, table.getName());
}
List<LoadQueueItem> lqis = splitStoreFile(item, table,
startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx]);
startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx], copyFile);
return new Pair<>(lqis, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ public void testGroupOrSplitPresplit() throws Exception {
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Table htable, final Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
Pair<List<LoadQueueItem>, String> lqis =
super.groupOrSplit(regionGroups, item, htable, startEndKeys);
super.groupOrSplit(regionGroups, item, htable, startEndKeys, false);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
Expand Down Expand Up @@ -545,13 +545,13 @@ public void testGroupOrSplitFailure() throws Exception {
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Table table, final Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
i++;

if (i == 5) {
throw new IOException("failure");
}
return super.groupOrSplit(regionGroups, item, table, startEndKeys);
return super.groupOrSplit(regionGroups, item, table, startEndKeys, false);
}
};

Expand Down Expand Up @@ -586,9 +586,9 @@ public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
@Override
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Table htable, final Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
Pair<List<LoadQueueItem>, String> lqis =
super.groupOrSplit(regionGroups, item, htable, startEndKeys);
super.groupOrSplit(regionGroups, item, htable, startEndKeys, false);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
Expand Down