Skip to content

Commit b46169d

Browse files
Yiran-wustoty
authored andcommitted
HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (apache#553)
Signed-off-by: Sakthi <sakthi@apache.org> (cherry picked from commit f31301d) Change-Id: Ic88e96cc1b83351424463fef52579a35a4074822
1 parent fd27d68 commit b46169d

File tree

2 files changed

+55
-10
lines changed

2 files changed

+55
-10
lines changed

hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.TableName;
3838
import org.apache.hadoop.hbase.client.Table;
3939
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40+
import org.apache.hadoop.hbase.quotas.QuotaUtil;
4041
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
4142
import org.apache.hadoop.hbase.testclassification.LargeTests;
4243
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -63,6 +64,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
6364
protected void customizeClusterConf(Configuration conf) {
6465
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6566
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
67+
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
6668
conf.set("hbase.replication.source.fs.conf.provider",
6769
TestSourceFSConfigurationProvider.class.getCanonicalName());
6870
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
@@ -81,20 +83,21 @@ public void testSyncUpTool() throws Exception {
8183
setupReplication();
8284

8385
/**
84-
* Prepare 16 random hfile ranges required for creating hfiles
86+
* Prepare 24 random hfile ranges required for creating hfiles
8587
*/
8688
Iterator<String> randomHFileRangeListIterator = null;
87-
Set<String> randomHFileRanges = new HashSet<>(16);
88-
for (int i = 0; i < 16; i++) {
89+
Set<String> randomHFileRanges = new HashSet<>(24);
90+
for (int i = 0; i < 24; i++) {
8991
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
9092
}
9193
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
9294
Collections.sort(randomHFileRangeList);
9395
randomHFileRangeListIterator = randomHFileRangeList.iterator();
9496

9597
/**
96-
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
97-
* into cf1, and 3 rows into norep verify correctly replicated to slave
98+
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
99+
* into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
100+
* rows into norep verify correctly replicated to slave
98101
*/
99102
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
100103

@@ -175,23 +178,35 @@ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
175178
Iterator<String> randomHFileRangeListIterator) throws Exception {
176179
LOG.debug("loadAndReplicateHFiles");
177180

178-
// Load 100 + 3 hfiles to t1_syncup.
181+
// Load 50 + 50 + 3 hfiles to t1_syncup.
179182
byte[][][] hfileRanges =
180183
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
181184
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
182-
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
185+
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
186+
187+
hfileRanges =
188+
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
189+
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
190+
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
191+
hfileRanges, 50);
183192

184193
hfileRanges =
185194
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186195
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187196
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
188197
hfileRanges, 3);
189198

190-
// Load 200 + 3 hfiles to t2_syncup.
199+
// Load 100 + 100 + 3 hfiles to t2_syncup.
200+
hfileRanges =
201+
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
202+
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
203+
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
204+
191205
hfileRanges =
192206
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
193207
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
194-
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
208+
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
209+
hfileRanges, 100);
195210

196211
hfileRanges =
197212
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -229,6 +244,26 @@ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[]
229244
loader.bulkLoad(tableName, dir);
230245
}
231246

247+
private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
248+
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
249+
Path dir = UTIL2.getDataTestDirOnTestFS(testName);
250+
FileSystem fs = UTIL2.getTestFileSystem();
251+
dir = dir.makeQualified(fs);
252+
Path familyDir = new Path(dir, Bytes.toString(fam));
253+
254+
int hfileIdx = 0;
255+
for (byte[][] range : hfileRanges) {
256+
byte[] from = range[0];
257+
byte[] to = range[1];
258+
HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
259+
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
260+
}
261+
262+
final TableName tableName = source.getName();
263+
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
264+
loader.bulkLoad(tableName, dir);
265+
}
266+
232267
private void wait(Table target, int expectedCount, String msg)
233268
throws IOException, InterruptedException {
234269
for (int i = 0; i < NB_RETRIES; i++) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.atomic.LongAdder;
4848
import org.apache.commons.lang3.mutable.MutableObject;
4949
import org.apache.hadoop.conf.Configuration;
50+
import org.apache.hadoop.fs.FileSystem;
5051
import org.apache.hadoop.fs.Path;
5152
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
5253
import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -2376,7 +2377,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
23762377
filePaths.add(familyPath.getPath());
23772378
}
23782379
// Check if the batch of files exceeds the current quota
2379-
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
2380+
sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
23802381
}
23812382
}
23822383

@@ -2481,6 +2482,15 @@ public CoprocessorServiceResponse execService(final RpcController controller,
24812482
}
24822483
}
24832484

2485+
private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2486+
if (filePaths.isEmpty()) {
2487+
// local hdfs
2488+
return regionServer.getFileSystem();
2489+
}
2490+
// source hdfs
2491+
return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
2492+
}
2493+
24842494
private com.google.protobuf.Message execServiceOnRegion(HRegion region,
24852495
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
24862496
// ignore the passed in controller (from the serialized call)

0 commit comments

Comments
 (0)