Skip to content

Commit ccfbdad

Browse files
Yiran-wusaintstack
authored andcommitted
HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: stack <stack@apache.org>
1 parent abcb1ee commit ccfbdad

File tree

4 files changed

+328
-13
lines changed

4 files changed

+328
-13
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Set;
3132
import java.util.TreeMap;
3233
import java.util.UUID;
3334
import java.util.concurrent.ConcurrentHashMap;
@@ -227,16 +228,25 @@ public void enqueueLog(Path log) {
227228
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
228229
throws ReplicationException {
229230
String peerId = replicationPeer.getId();
231+
Set<String> namespaces = replicationPeer.getNamespaces();
230232
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
231-
if (tableCFMap != null) {
233+
if (tableCFMap != null) { // All peers with TableCFs
232234
List<String> tableCfs = tableCFMap.get(tableName);
233235
if (tableCFMap.containsKey(tableName)
234236
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
235237
this.queueStorage.addHFileRefs(peerId, pairs);
236238
metrics.incrSizeOfHFileRefsQueue(pairs.size());
237239
} else {
238240
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
239-
tableName, Bytes.toString(family), peerId);
241+
tableName, Bytes.toString(family), peerId);
242+
}
243+
} else if (namespaces != null) { // Only for set NAMESPACES peers
244+
if (namespaces.contains(tableName.getNamespaceAsString())) {
245+
this.queueStorage.addHFileRefs(peerId, pairs);
246+
metrics.incrSizeOfHFileRefsQueue(pairs.size());
247+
} else {
248+
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
249+
tableName, Bytes.toString(family), peerId);
240250
}
241251
} else {
242252
// user has explicitly not defined any table cfs for replication, means replicate all the

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.hadoop.hbase.HBaseTestingUtility;
4848
import org.apache.hadoop.hbase.HConstants;
4949
import org.apache.hadoop.hbase.KeyValue;
50+
import org.apache.hadoop.hbase.TableName;
5051
import org.apache.hadoop.hbase.client.Admin;
5152
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
5253
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -122,8 +123,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
122123
private static AtomicInteger BULK_LOADS_COUNT;
123124
private static CountDownLatch BULK_LOAD_LATCH;
124125

125-
private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
126-
private static final Configuration CONF3 = UTIL3.getConfiguration();
126+
protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
127+
protected static final Configuration CONF3 = UTIL3.getConfiguration();
127128

128129
private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
129130

@@ -220,7 +221,7 @@ public void tearDownBase() throws Exception {
220221
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
221222
}
222223

223-
private static void setupBulkLoadConfigsForCluster(Configuration config,
224+
protected static void setupBulkLoadConfigsForCluster(Configuration config,
224225
String clusterReplicationId) throws Exception {
225226
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
226227
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
@@ -238,13 +239,16 @@ public void testBulkLoadReplicationActiveActive() throws Exception {
238239
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
239240
byte[] row = Bytes.toBytes("001");
240241
byte[] value = Bytes.toBytes("v1");
241-
assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
242+
assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
243+
peer2TestTable, peer3TestTable);
242244
row = Bytes.toBytes("002");
243245
value = Bytes.toBytes("v2");
244-
assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
246+
assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
247+
peer2TestTable, peer3TestTable);
245248
row = Bytes.toBytes("003");
246249
value = Bytes.toBytes("v3");
247-
assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
250+
assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
251+
peer2TestTable, peer3TestTable);
248252
//Additional wait to make sure no extra bulk load happens
249253
Thread.sleep(400);
250254
//We have 3 bulk load events (1 initiated on each cluster).
@@ -278,18 +282,18 @@ public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception
278282
}
279283

280284

281-
private void assertBulkLoadConditions(byte[] row, byte[] value,
285+
protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
282286
HBaseTestingUtility utility, Table...tables) throws Exception {
283287
BULK_LOAD_LATCH = new CountDownLatch(3);
284-
bulkLoadOnCluster(row, value, utility);
288+
bulkLoadOnCluster(tableName, row, value, utility);
285289
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
286290
assertTableHasValue(tables[0], row, value);
287291
assertTableHasValue(tables[1], row, value);
288292
assertTableHasValue(tables[2], row, value);
289293
}
290294

291-
private void bulkLoadOnCluster(byte[] row, byte[] value,
292-
HBaseTestingUtility cluster) throws Exception {
295+
protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
296+
HBaseTestingUtility cluster) throws Exception {
293297
String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
294298
copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
295299
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
@@ -302,13 +306,19 @@ private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws
302306
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
303307
}
304308

305-
private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
309+
protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
306310
Get get = new Get(row);
307311
Result result = table.get(get);
308312
assertTrue(result.advance());
309313
assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
310314
}
311315

316+
protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
317+
Get get = new Get(row);
318+
Result result = table.get(get);
319+
assertTrue(result.isEmpty());
320+
}
321+
312322
private String createHFileForFamilies(byte[] row, byte[] value,
313323
Configuration clusterConfig) throws IOException {
314324
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);

0 commit comments

Comments
 (0)