17
17
*/
18
18
package org .apache .hadoop .hbase .util ;
19
19
20
+ import static org .junit .Assert .assertEquals ;
21
+
22
+ import java .util .Collections ;
23
+ import java .util .List ;
24
+ import java .util .stream .Stream ;
20
25
import org .apache .hadoop .hbase .HBaseClassTestRule ;
21
26
import org .apache .hadoop .hbase .HBaseTestingUtil ;
27
+ import org .apache .hadoop .hbase .ServerName ;
28
+ import org .apache .hadoop .hbase .TableName ;
29
+ import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
30
+ import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
31
+ import org .apache .hadoop .hbase .replication .ReplicationPeerStorage ;
32
+ import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
33
+ import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
34
+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
35
+ import org .apache .hadoop .hbase .replication .SyncReplicationState ;
22
36
import org .apache .hadoop .hbase .testclassification .MediumTests ;
23
37
import org .apache .hadoop .hbase .testclassification .ReplicationTests ;
24
- import org .junit .AfterClass ;
25
- import org .junit .BeforeClass ;
38
+ import org .apache .hadoop .hbase .util .HbckErrorReporter .ERROR_CODE ;
39
+ import org .apache .hadoop .hbase .util .hbck .HbckTestingUtil ;
40
+ import org .junit .After ;
41
+ import org .junit .Before ;
26
42
import org .junit .ClassRule ;
27
- import org .junit .Ignore ;
43
+ import org .junit .Rule ;
28
44
import org .junit .Test ;
29
45
import org .junit .experimental .categories .Category ;
46
+ import org .junit .rules .TestName ;
30
47
31
48
@ Category ({ ReplicationTests .class , MediumTests .class })
32
49
public class TestHBaseFsckReplication {
@@ -36,65 +53,78 @@ public class TestHBaseFsckReplication {
36
53
HBaseClassTestRule .forClass (TestHBaseFsckReplication .class );
37
54
38
55
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil ();
56
+ @ Rule
57
+ public final TestName name = new TestName ();
39
58
40
- @ BeforeClass
41
- public static void setUp () throws Exception {
59
+ @ Before
60
+ public void setUp () throws Exception {
42
61
UTIL .getConfiguration ().setBoolean ("hbase.write.hbck1.lock.file" , false );
43
62
UTIL .startMiniCluster (1 );
63
+ TableName tableName = TableName .valueOf ("replication_" + name .getMethodName ());
64
+ UTIL .getAdmin ()
65
+ .createTable (ReplicationStorageFactory .createReplicationQueueTableDescriptor (tableName ));
66
+ UTIL .getConfiguration ().set (ReplicationStorageFactory .REPLICATION_QUEUE_TABLE_NAME ,
67
+ tableName .getNameAsString ());
44
68
}
45
69
46
- @ AfterClass
47
- public static void tearDown () throws Exception {
70
+ @ After
71
+ public void tearDown () throws Exception {
48
72
UTIL .shutdownMiniCluster ();
49
73
}
50
74
51
- // TODO: reimplement
52
- @ Ignore
53
75
@ Test
54
76
public void test () throws Exception {
55
- // ReplicationPeerStorage peerStorage = ReplicationStorageFactory
56
- // .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
57
- // ReplicationQueueStorage queueStorage = ReplicationStorageFactory
58
- // .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
59
- //
60
- // String peerId1 = "1";
61
- // String peerId2 = "2";
62
- // peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
63
- // true, SyncReplicationState.NONE);
64
- // peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(),
65
- // true, SyncReplicationState.NONE);
66
- // for (int i = 0; i < 10; i++) {
67
- // queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1,
68
- // "file-" + i);
69
- // }
70
- // queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file");
71
- // HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
72
- // HbckTestingUtil.assertNoErrors(fsck);
73
- //
74
- // // should not remove anything since the replication peer is still alive
75
- // assertEquals(10, queueStorage.getListOfReplicators().size());
76
- // peerStorage.removePeer(peerId1);
77
- // // there should be orphan queues
78
- // assertEquals(10, queueStorage.getListOfReplicators().size());
79
- // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false);
80
- // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
81
- // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
82
- // }).limit(10).toArray(ERROR_CODE[]::new));
83
- //
84
- // // should not delete anything when fix is false
85
- // assertEquals(10, queueStorage.getListOfReplicators().size());
86
- //
87
- // fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true);
88
- // HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> {
89
- // return ERROR_CODE.UNDELETED_REPLICATION_QUEUE;
90
- // }).limit(10).toArray(ERROR_CODE[]::new));
91
- //
92
- // List<ServerName> replicators = queueStorage.getListOfReplicators();
93
- // // should not remove the server with queue for peerId2
94
- // assertEquals(1, replicators.size());
95
- // assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0));
96
- // for (String queueId : queueStorage.getAllQueues(replicators.get(0))) {
97
- // assertEquals(peerId2, queueId);
98
- // }
77
+ ReplicationPeerStorage peerStorage = ReplicationStorageFactory .getReplicationPeerStorage (
78
+ UTIL .getTestFileSystem (), UTIL .getZooKeeperWatcher (), UTIL .getConfiguration ());
79
+ ReplicationQueueStorage queueStorage = ReplicationStorageFactory
80
+ .getReplicationQueueStorage (UTIL .getConnection (), UTIL .getConfiguration ());
81
+
82
+ String peerId1 = "1" ;
83
+ String peerId2 = "2" ;
84
+ peerStorage .addPeer (peerId1 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
85
+ true , SyncReplicationState .NONE );
86
+ peerStorage .addPeer (peerId2 , ReplicationPeerConfig .newBuilder ().setClusterKey ("key" ).build (),
87
+ true , SyncReplicationState .NONE );
88
+ ReplicationQueueId queueId = null ;
89
+ for (int i = 0 ; i < 10 ; i ++) {
90
+ queueId = new ReplicationQueueId (getServerName (i ), peerId1 );
91
+ queueStorage .setOffset (queueId , "group-" + i ,
92
+ new ReplicationGroupOffset ("file-" + i , i * 100 ), Collections .emptyMap ());
93
+ }
94
+ queueId = new ReplicationQueueId (getServerName (0 ), peerId2 );
95
+ queueStorage .setOffset (queueId , "group-" + 0 , new ReplicationGroupOffset ("file-" + 0 , 100 ),
96
+ Collections .emptyMap ());
97
+ HBaseFsck fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
98
+ HbckTestingUtil .assertNoErrors (fsck );
99
+
100
+ // should not remove anything since the replication peer is still alive
101
+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
102
+ peerStorage .removePeer (peerId1 );
103
+ // there should be orphan queues
104
+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
105
+ fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), false );
106
+ HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
107
+ return ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
108
+ }).limit (10 ).toArray (ERROR_CODE []::new ));
109
+
110
+ // should not delete anything when fix is false
111
+ assertEquals (10 , queueStorage .listAllReplicators ().size ());
112
+
113
+ fsck = HbckTestingUtil .doFsck (UTIL .getConfiguration (), true );
114
+ HbckTestingUtil .assertErrors (fsck , Stream .generate (() -> {
115
+ return ERROR_CODE .UNDELETED_REPLICATION_QUEUE ;
116
+ }).limit (10 ).toArray (HbckErrorReporter .ERROR_CODE []::new ));
117
+
118
+ List <ServerName > replicators = queueStorage .listAllReplicators ();
119
+ // should not remove the server with queue for peerId2
120
+ assertEquals (1 , replicators .size ());
121
+ assertEquals (ServerName .valueOf ("localhost" , 10000 , 100000 ), replicators .get (0 ));
122
+ for (ReplicationQueueId qId : queueStorage .listAllQueueIds (replicators .get (0 ))) {
123
+ assertEquals (peerId2 , qId .getPeerId ());
124
+ }
125
+ }
126
+
127
+ private ServerName getServerName (int i ) {
128
+ return ServerName .valueOf ("localhost" , 10000 + i , 100000 + i );
99
129
}
100
130
}
0 commit comments