Skip to content

Commit 49c9321

Browse files
committed
add UT
1 parent 1c2d3ce commit 49c9321

File tree

9 files changed

+563
-17
lines changed

9 files changed

+563
-17
lines changed

hbase-replication/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@
9898
<artifactId>junit</artifactId>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>org.hamcrest</groupId>
103+
<artifactId>hamcrest-core</artifactId>
104+
<scope>test</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.hamcrest</groupId>
108+
<artifactId>hamcrest-library</artifactId>
109+
<scope>test</scope>
110+
</dependency>
101111
<dependency>
102112
<groupId>org.mockito</groupId>
103113
<artifactId>mockito-core</artifactId>

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
7777

7878
private final TableName tableName;
7979

80-
@FunctionalInterface
81-
private interface TableCreator {
82-
83-
void create() throws IOException;
84-
}
85-
8680
public TableReplicationQueueStorage(Connection conn, TableName tableName) {
8781
this.conn = conn;
8882
this.tableName = tableName;

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.replication;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -239,14 +240,19 @@ public List<ZkLastPushedSeqId> next() throws Exception {
239240
return null;
240241
}
241242
if (level1Prefix != null) {
243+
// this will also delete the previous level2Prefix which is under this level1Prefix
242244
ZKUtil.deleteNodeRecursively(zookeeper,
243245
ZNodePaths.joinZNode(regionsZNode, level1Prefix));
244246
}
245247
level1Prefix = level1Iter.next();
246-
List<String> level2Prefix = ZKUtil.listChildrenNoWatch(zookeeper,
248+
List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
247249
ZNodePaths.joinZNode(regionsZNode, level1Prefix));
248-
if (level2Prefix != null) {
249-
level2Iter = level2Prefix.iterator();
250+
if (level2Prefixes != null) {
251+
level2Iter = level2Prefixes.iterator();
252+
// reset level2Prefix as we have switched level1Prefix, otherwise the below delete
253+
// level2Prefix section will delete the znode with this level2Prefix under the new
254+
// level1Prefix
255+
level2Prefix = null;
250256
}
251257
} else {
252258
if (level2Prefix != null) {
@@ -314,10 +320,32 @@ public Pair<String, List<String>> next() throws KeeperException {
314320
}
315321

316322
public boolean hasData() throws KeeperException {
317-
return ZKUtil.checkExists(zookeeper, replicationZNode) != -1;
323+
return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
324+
|| ZKUtil.checkExists(zookeeper, regionsZNode) != -1
325+
|| ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
326+
}
327+
328+
public void deleteAllData() throws KeeperException {
329+
ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
330+
ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
331+
ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
332+
}
333+
334+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
335+
allowedOnPath = ".*/src/test/.*")
336+
String getQueuesZNode() {
337+
return queuesZNode;
338+
}
339+
340+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
341+
allowedOnPath = ".*/src/test/.*")
342+
String getHfileRefsZNode() {
343+
return hfileRefsZNode;
318344
}
319345

320-
public void deleteRootZNode() throws KeeperException {
321-
ZKUtil.deleteNodeRecursively(zookeeper, replicationZNode);
346+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
347+
allowedOnPath = ".*/src/test/.*")
348+
String getRegionsZNode() {
349+
return regionsZNode;
322350
}
323351
}
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.empty;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
25+
import static org.junit.Assert.assertNull;
26+
import static org.junit.Assert.assertTrue;
27+
28+
import java.io.IOException;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.hbase.HBaseClassTestRule;
36+
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
37+
import org.apache.hadoop.hbase.ServerName;
38+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.MigrationIterator;
39+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkLastPushedSeqId;
40+
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage.ZkReplicationQueueData;
41+
import org.apache.hadoop.hbase.testclassification.MediumTests;
42+
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
43+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44+
import org.apache.hadoop.hbase.util.MD5Hash;
45+
import org.apache.hadoop.hbase.util.Pair;
46+
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
47+
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
48+
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
49+
import org.junit.After;
50+
import org.junit.AfterClass;
51+
import org.junit.Before;
52+
import org.junit.BeforeClass;
53+
import org.junit.ClassRule;
54+
import org.junit.Rule;
55+
import org.junit.Test;
56+
import org.junit.experimental.categories.Category;
57+
import org.junit.rules.TestName;
58+
59+
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
60+
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
61+
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
62+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
63+
64+
@Category({ ReplicationTests.class, MediumTests.class })
65+
public class TestZKReplicationQueueStorage {
66+
67+
@ClassRule
68+
public static final HBaseClassTestRule CLASS_RULE =
69+
HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
70+
71+
private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
72+
73+
private ZKWatcher zk;
74+
75+
private ZKReplicationQueueStorage storage;
76+
77+
@Rule
78+
public final TestName name = new TestName();
79+
80+
@BeforeClass
81+
public static void setUpBeforeClass() throws Exception {
82+
UTIL.startMiniZKCluster();
83+
}
84+
85+
@AfterClass
86+
public static void tearDownAfterClass() throws IOException {
87+
UTIL.shutdownMiniZKCluster();
88+
}
89+
90+
@Before
91+
public void setUp() throws IOException {
92+
Configuration conf = UTIL.getConfiguration();
93+
conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
94+
zk = new ZKWatcher(conf, name.getMethodName(), null);
95+
storage = new ZKReplicationQueueStorage(zk, conf);
96+
}
97+
98+
@After
99+
public void tearDown() throws Exception {
100+
ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
101+
Closeables.close(zk, true);
102+
}
103+
104+
@Test
105+
public void testDeleteAllData() throws Exception {
106+
assertFalse(storage.hasData());
107+
ZKUtil.createWithParents(zk, storage.getQueuesZNode());
108+
assertTrue(storage.hasData());
109+
storage.deleteAllData();
110+
assertFalse(storage.hasData());
111+
}
112+
113+
@Test
114+
public void testEmptyIter() throws Exception {
115+
ZKUtil.createWithParents(zk, storage.getQueuesZNode());
116+
ZKUtil.createWithParents(zk, storage.getRegionsZNode());
117+
ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
118+
assertNull(storage.listAllQueues().next());
119+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
120+
assertNull(storage.listAllLastPushedSeqIds().next());
121+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
122+
assertNull(storage.listAllHFileRefs().next());
123+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
124+
}
125+
126+
@Test
127+
public void testListAllQueues() throws Exception {
128+
String peerId = "1";
129+
ServerName deadServer =
130+
ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
131+
int nServers = 10;
132+
for (int i = 0; i < nServers; i++) {
133+
ServerName sn =
134+
ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
135+
String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
136+
String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
137+
ZKUtil.createWithParents(zk, peerZNode);
138+
for (int j = 0; j < i; j++) {
139+
String wal = ZNodePaths.joinZNode(peerZNode, "wal-" + j);
140+
ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
141+
}
142+
String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
143+
ZKUtil.createWithParents(zk, deadServerPeerZNode);
144+
for (int j = 0; j < i; j++) {
145+
String wal = ZNodePaths.joinZNode(deadServerPeerZNode, "wal-" + j);
146+
if (j > 0) {
147+
ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
148+
} else {
149+
ZKUtil.createWithParents(zk, wal);
150+
}
151+
}
152+
}
153+
ZKUtil.createWithParents(zk,
154+
ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
155+
MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
156+
storage.listAllQueues();
157+
ServerName previousServerName = null;
158+
for (int i = 0; i < nServers + 1; i++) {
159+
Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
160+
assertNotNull(pair);
161+
if (previousServerName != null) {
162+
assertEquals(-1, ZKUtil.checkExists(zk,
163+
ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
164+
}
165+
ServerName sn = pair.getFirst();
166+
previousServerName = sn;
167+
if (sn.equals(deadServer)) {
168+
assertThat(pair.getSecond(), empty());
169+
} else {
170+
assertEquals(2, pair.getSecond().size());
171+
int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split((sn.getHostname()))));
172+
ZkReplicationQueueData data0 = pair.getSecond().get(0);
173+
assertEquals(peerId, data0.getQueueId().getPeerId());
174+
assertEquals(sn, data0.getQueueId().getServerName());
175+
assertEquals(n, data0.getWalOffsets().size());
176+
for (int j = 0; j < n; j++) {
177+
assertEquals(j, data0.getWalOffsets().get("wal-" + j).intValue());
178+
}
179+
ZkReplicationQueueData data1 = pair.getSecond().get(1);
180+
assertEquals(peerId, data1.getQueueId().getPeerId());
181+
assertEquals(sn, data1.getQueueId().getServerName());
182+
assertEquals(n, data1.getWalOffsets().size());
183+
for (int j = 0; j < n; j++) {
184+
assertEquals(j, data1.getWalOffsets().get("wal-" + j).intValue());
185+
}
186+
// the order of the returned result is undetermined
187+
if (data0.getQueueId().getSourceServerName().isPresent()) {
188+
assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
189+
assertFalse(data1.getQueueId().getSourceServerName().isPresent());
190+
} else {
191+
assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
192+
}
193+
}
194+
}
195+
assertNull(iter.next());
196+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
197+
}
198+
199+
private String getLastPushedSeqIdZNode(String encodedName, String peerId) {
200+
return ZNodePaths.joinZNode(storage.getRegionsZNode(), encodedName.substring(0, 2),
201+
encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
202+
}
203+
204+
@Test
205+
public void testListAllLastPushedSeqIds() throws Exception {
206+
String peerId1 = "1";
207+
String peerId2 = "2";
208+
Map<String, Set<String>> name2PeerIds = new HashMap<>();
209+
byte[] bytes = new byte[32];
210+
for (int i = 0; i < 100; i++) {
211+
ThreadLocalRandom.current().nextBytes(bytes);
212+
String encodeName = MD5Hash.getMD5AsHex(bytes);
213+
String znode1 = getLastPushedSeqIdZNode(encodeName, peerId1);
214+
ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
215+
String znode2 = getLastPushedSeqIdZNode(encodeName, peerId2);
216+
ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
217+
name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
218+
}
219+
int addedEmptyZNodes = 0;
220+
for (int i = 0; i < 256; i++) {
221+
String level1ZNode =
222+
ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
223+
if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
224+
ZKUtil.createWithParents(zk, level1ZNode);
225+
addedEmptyZNodes++;
226+
if (addedEmptyZNodes <= 10) {
227+
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
228+
}
229+
if (addedEmptyZNodes >= 20) {
230+
break;
231+
}
232+
233+
}
234+
}
235+
MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
236+
int emptyListCount = 0;
237+
for (;;) {
238+
List<ZkLastPushedSeqId> list = iter.next();
239+
if (list == null) {
240+
break;
241+
}
242+
if (list.isEmpty()) {
243+
emptyListCount++;
244+
continue;
245+
}
246+
for (ZkLastPushedSeqId seqId : list) {
247+
name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
248+
if (seqId.getPeerId().equals(peerId1)) {
249+
assertEquals(1, seqId.getLastPushedSeqId());
250+
} else {
251+
assertEquals(2, seqId.getLastPushedSeqId());
252+
}
253+
}
254+
}
255+
assertEquals(10, emptyListCount);
256+
name2PeerIds.forEach((encodedRegionName, peerIds) -> {
257+
assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
258+
});
259+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
260+
}
261+
262+
@Test
263+
public void testListAllHFilesRefs() throws Exception {
264+
int nPeers = 10;
265+
for (int i = 0; i < nPeers; i++) {
266+
String peerId = "peer_" + i;
267+
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
268+
for (int j = 0; j < i; j++) {
269+
ZKUtil.createWithParents(zk,
270+
ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
271+
}
272+
}
273+
MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
274+
String previousPeerId = null;
275+
for (int i = 0; i < nPeers; i++) {
276+
Pair<String, List<String>> pair = iter.next();
277+
if (previousPeerId != null) {
278+
assertEquals(-1, ZKUtil.checkExists(zk,
279+
ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
280+
}
281+
String peerId = pair.getFirst();
282+
previousPeerId = peerId;
283+
int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
284+
assertEquals(index, pair.getSecond().size());
285+
}
286+
assertNull(iter.next());
287+
assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
288+
}
289+
}

0 commit comments

Comments
 (0)