Skip to content

Commit fec4c52

Browse files
authored
HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated
Signed-off-by: stack <stack@apache.org>
1 parent ba12d5b commit fec4c52

File tree

15 files changed

+123
-35
lines changed

15 files changed

+123
-35
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2570,16 +2570,19 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN
25702570
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
25712571
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
25722572
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
2573-
storeFilesSize, bulkloadSeqId, null);
2573+
storeFilesSize, bulkloadSeqId, null, true);
25742574
}
25752575

25762576
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
25772577
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2578-
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
2578+
Map<String, Long> storeFilesSize, long bulkloadSeqId,
2579+
List<String> clusterIds, boolean replicate) {
25792580
BulkLoadDescriptor.Builder desc =
25802581
BulkLoadDescriptor.newBuilder()
2581-
.setTableName(ProtobufUtil.toProtoTableName(tableName))
2582-
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
2582+
.setTableName(ProtobufUtil.toProtoTableName(tableName))
2583+
.setEncodedRegionName(encodedRegionName)
2584+
.setBulkloadSeqNum(bulkloadSeqId)
2585+
.setReplicate(replicate);
25832586
if(clusterIds != null) {
25842587
desc.addAllClusterIds(clusterIds);
25852588
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
568568
final byte[] regionName, boolean assignSeqNum,
569569
final Token<?> userToken, final String bulkToken) {
570570
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
571-
false, null);
571+
false, null, true);
572572
}
573573

574574
/**
@@ -585,7 +585,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
585585
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
586586
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
587587
final Token<?> userToken, final String bulkToken, boolean copyFiles,
588-
List<String> clusterIds) {
588+
List<String> clusterIds, boolean replicate) {
589589
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
590590
RegionSpecifierType.REGION_NAME, regionName);
591591

@@ -626,6 +626,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
626626
if (clusterIds != null) {
627627
request.addAllClusterIds(clusterIds);
628628
}
629+
request.setReplicate(replicate);
629630
return request.build();
630631
}
631632

hbase-protocol-shaded/src/main/protobuf/Client.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
379379
optional string bulk_token = 5;
380380
optional bool copy_file = 6 [default = false];
381381
repeated string cluster_ids = 7;
382+
optional bool replicate = 8 [default = true];
382383

383384
message FamilyPath {
384385
required bytes family = 1;

hbase-protocol-shaded/src/main/protobuf/WAL.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ message BulkLoadDescriptor {
151151
repeated StoreDescriptor stores = 3;
152152
required int64 bulkload_seq_num = 4;
153153
repeated string cluster_ids = 5;
154+
optional bool replicate = 6 [default = true];
154155
}
155156

156157
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,18 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
8181
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
8282
* not expect additional clusterIds param.
8383
* @param tableName the target table
84-
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded
85-
* @param row row key
86-
* @param assignSeqNum seq num for the event on WAL
87-
* @param userToken user token
88-
* @param bulkToken bulk load token
89-
* @param copyFiles flag for copying the loaded hfiles
84+
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
85+
* @param row row key.
86+
* @param assignSeqNum seq num for the event on WAL.
87+
* @param userToken user token.
88+
* @param bulkToken bulk load token.
89+
* @param copyFiles flag for copying the loaded hfiles.
9090
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
91+
* @param replicate flags if the bulkload is targeted for replication.
9192
*/
9293
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
9394
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
94-
List<String> clusterIds);
95+
List<String> clusterIds, boolean replicate);
9596

9697
/**
9798
* Clean up after finishing bulk load, no matter success or not.

hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,13 @@ public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
109109
@Override
110110
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
111111
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
112-
String bulkToken, boolean copyFiles, List<String> clusterIds) {
112+
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
113113
return callerFactory.<Boolean> single().table(tableName).row(row)
114114
.action((controller, loc, stub) -> ConnectionUtils
115115
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
116116
null,
117117
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
118-
userToken, bulkToken, copyFiles, clusterIds),
118+
userToken, bulkToken, copyFiles, clusterIds, replicate),
119119
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
120120
.call();
121121
}

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,9 @@ private void bulkloadRefFile(TableName tableName, Path bulkloadDirectory, String
827827
throws IOException {
828828
// bulkload the ref file
829829
try {
830-
BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
830+
BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
831+
bulkLoader.disableReplication();
832+
bulkLoader.bulkLoad(tableName, bulkloadDirectory);
831833
} catch (Exception e) {
832834
throw new IOException(e);
833835
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6146,7 +6146,8 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
61466146
*/
61476147
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
61486148
BulkLoadListener bulkLoadListener) throws IOException {
6149-
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
6149+
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
6150+
null, true);
61506151
}
61516152

61526153
/**
@@ -6197,7 +6198,7 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
61976198
*/
61986199
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
61996200
boolean assignSeqId, BulkLoadListener bulkLoadListener,
6200-
boolean copyFile, List<String> clusterIds) throws IOException {
6201+
boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
62016202
long seqId = -1;
62026203
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
62036204
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6372,7 +6373,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
63726373
WALProtos.BulkLoadDescriptor loadDescriptor =
63736374
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
63746375
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
6375-
storeFiles, storeFilesSizes, seqId, clusterIds);
6376+
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
63766377
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
63776378
loadDescriptor, mvcc);
63786379
} catch (IOException ioe) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public Map<byte[], List<Path>> run() {
296296
//To enable access prior to staging
297297
return region.bulkLoadHFiles(familyPaths, true,
298298
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
299-
clusterIds);
299+
clusterIds, request.getReplicate());
300300
} catch (Exception e) {
301301
LOG.error("Failed to complete bulk load", e);
302302
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -203,18 +203,19 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
203203
// Handle bulk load hfiles replication
204204
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
205205
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
206-
if(bulkLoadsPerClusters == null) {
207-
bulkLoadsPerClusters = new HashMap<>();
208-
}
209-
// Map of table name Vs list of pair of family and list of
210-
// hfile paths from its namespace
211-
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
212-
bulkLoadsPerClusters.get(bld.getClusterIdsList());
213-
if (bulkLoadHFileMap == null) {
214-
bulkLoadHFileMap = new HashMap<>();
215-
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
206+
if(bld.getReplicate()) {
207+
if (bulkLoadsPerClusters == null) {
208+
bulkLoadsPerClusters = new HashMap<>();
209+
}
210+
// Map of table name Vs list of pair of family and list of
211+
// hfile paths from its namespace
212+
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList());
213+
if (bulkLoadHFileMap == null) {
214+
bulkLoadHFileMap = new HashMap<>();
215+
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
216+
}
217+
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
216218
}
217-
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
218219
} else {
219220
// Handle wal replication
220221
if (isNewRowOrType(previousCell, cell)) {

0 commit comments

Comments
 (0)