Skip to content

Commit 31c4aea

Browse files
authored
HBASE-27728 Implement a tool to migrate replication peer data between different storage implementation (#5179)
Signed-off-by: Liangjun He <heliangjun@apache.org> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 63471ef commit 31c4aea

File tree

7 files changed

+343
-119
lines changed

7 files changed

+343
-119
lines changed

bin/hbase

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ show_usage() {
109109
echo " pre-upgrade Run Pre-Upgrade validator tool"
110110
echo " hbtop Run HBTop tool"
111111
echo " credential Run the Hadoop Credential Shell"
112+
echo " copyreppeers Run CopyReplicationPeers tool"
112113
echo " CLASSNAME Run the class named CLASSNAME"
113114
}
114115

@@ -769,6 +770,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then
769770
HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}"
770771
elif [ "$COMMAND" = "credential" ] ; then
771772
CLASS='org.apache.hadoop.security.alias.CredentialShell'
773+
elif [ "$COMMAND" = "copyreppeers" ] ; then
774+
CLASS='org.apache.hadoop.hbase.replication.ReplicationPeerMigrationTool'
772775
else
773776
CLASS=$COMMAND
774777
if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import static java.util.stream.Collectors.toList;
21+
import static java.util.stream.Collectors.toSet;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotNull;
24+
import static org.junit.Assert.assertTrue;
25+
26+
import java.util.HashMap;
27+
import java.util.Iterator;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Random;
31+
import java.util.Set;
32+
import java.util.stream.Stream;
33+
import org.apache.hadoop.hbase.TableName;
34+
35+
/**
36+
* A helper tool for generating random {@link ReplicationPeerConfig} and do assertion.
37+
*/
38+
public final class ReplicationPeerConfigTestUtil {
39+
40+
// Seed may be set with Random#setSeed
41+
private static final Random RNG = new Random();
42+
43+
private ReplicationPeerConfigTestUtil() {
44+
}
45+
46+
private static Set<String> randNamespaces(Random rand) {
47+
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
48+
.collect(toSet());
49+
}
50+
51+
private static Map<TableName, List<String>> randTableCFs(Random rand) {
52+
int size = rand.nextInt(5);
53+
Map<TableName, List<String>> map = new HashMap<>();
54+
for (int i = 0; i < size; i++) {
55+
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
56+
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
57+
.limit(rand.nextInt(5)).collect(toList());
58+
map.put(tn, cfs);
59+
}
60+
return map;
61+
}
62+
63+
public static ReplicationPeerConfig getConfig(int seed) {
64+
RNG.setSeed(seed);
65+
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
66+
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
67+
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
68+
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
69+
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
70+
.setBandwidth(RNG.nextInt(1000)).build();
71+
}
72+
73+
private static void assertSetEquals(Set<String> expected, Set<String> actual) {
74+
if (expected == null || expected.size() == 0) {
75+
assertTrue(actual == null || actual.size() == 0);
76+
return;
77+
}
78+
assertEquals(expected.size(), actual.size());
79+
expected.forEach(s -> assertTrue(actual.contains(s)));
80+
}
81+
82+
private static void assertMapEquals(Map<TableName, List<String>> expected,
83+
Map<TableName, List<String>> actual) {
84+
if (expected == null || expected.size() == 0) {
85+
assertTrue(actual == null || actual.size() == 0);
86+
return;
87+
}
88+
assertEquals(expected.size(), actual.size());
89+
expected.forEach((expectedTn, expectedCFs) -> {
90+
List<String> actualCFs = actual.get(expectedTn);
91+
if (expectedCFs == null || expectedCFs.size() == 0) {
92+
assertTrue(actual.containsKey(expectedTn));
93+
assertTrue(actualCFs == null || actualCFs.size() == 0);
94+
} else {
95+
assertNotNull(actualCFs);
96+
assertEquals(expectedCFs.size(), actualCFs.size());
97+
for (Iterator<String> expectedIt = expectedCFs.iterator(),
98+
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
99+
assertEquals(expectedIt.next(), actualIt.next());
100+
}
101+
}
102+
});
103+
}
104+
105+
public static void assertConfigEquals(ReplicationPeerConfig expected,
106+
ReplicationPeerConfig actual) {
107+
assertEquals(expected.getClusterKey(), actual.getClusterKey());
108+
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
109+
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
110+
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
111+
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
112+
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
113+
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
114+
assertEquals(expected.getBandwidth(), actual.getBandwidth());
115+
}
116+
}

hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase.replication;
1919

20-
import static java.util.stream.Collectors.toList;
21-
import static java.util.stream.Collectors.toSet;
20+
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig;
2221
import static org.junit.Assert.assertEquals;
2322
import static org.junit.Assert.assertFalse;
2423
import static org.junit.Assert.assertNull;
2524
import static org.junit.Assert.assertTrue;
2625

27-
import java.util.HashMap;
2826
import java.util.List;
2927
import java.util.Map;
30-
import java.util.Random;
31-
import java.util.Set;
32-
import java.util.stream.Stream;
3328
import org.apache.hadoop.conf.Configuration;
3429
import org.apache.hadoop.hbase.HBaseClassTestRule;
3530
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -260,35 +255,6 @@ public void testNeedToReplicateCFWithoutReplicatingAll() {
260255
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
261256
}
262257

263-
private static final Random RNG = new Random(); // Seed may be set with Random#setSeed
264-
265-
private Set<String> randNamespaces(Random rand) {
266-
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
267-
.collect(toSet());
268-
}
269-
270-
private Map<TableName, List<String>> randTableCFs(Random rand) {
271-
int size = rand.nextInt(5);
272-
Map<TableName, List<String>> map = new HashMap<>();
273-
for (int i = 0; i < size; i++) {
274-
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
275-
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
276-
.limit(rand.nextInt(5)).collect(toList());
277-
map.put(tn, cfs);
278-
}
279-
return map;
280-
}
281-
282-
private ReplicationPeerConfig getConfig(int seed) {
283-
RNG.setSeed(seed);
284-
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
285-
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
286-
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
287-
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
288-
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
289-
.setBandwidth(RNG.nextInt(1000)).build();
290-
}
291-
292258
@Test
293259
public void testBaseReplicationPeerConfig() throws ReplicationException {
294260
String customPeerConfigKey = "hbase.xxx.custom_config";

hbase-replication/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@
7474
<type>test-jar</type>
7575
<scope>test</scope>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.hbase</groupId>
79+
<artifactId>hbase-client</artifactId>
80+
<type>test-jar</type>
81+
<scope>test</scope>
82+
</dependency>
7783
<dependency>
7884
<groupId>org.apache.hbase</groupId>
7985
<artifactId>hbase-zookeeper</artifactId>
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.conf.Configured;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.hbase.Abortable;
25+
import org.apache.hadoop.hbase.HBaseConfiguration;
26+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27+
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
28+
import org.apache.hadoop.util.Tool;
29+
import org.apache.hadoop.util.ToolRunner;
30+
import org.apache.yetus.audience.InterfaceAudience;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* A tool for copying replication peer data across different replication peer storages.
36+
* <p/>
37+
* Notice that we will not delete the replication peer data from the source storage, as this tool
38+
* can also be used by online migration. See HBASE-27110 for the whole design.
39+
*/
40+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
41+
public class CopyReplicationPeers extends Configured implements Tool {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class);
44+
45+
public static final String NAME = "copyreppeers";
46+
47+
public CopyReplicationPeers(Configuration conf) {
48+
super(conf);
49+
}
50+
51+
private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) {
52+
Configuration conf = new Configuration(getConf());
53+
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type);
54+
return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
55+
}
56+
57+
private ZKWatcher createZKWatcher() throws IOException {
58+
return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() {
59+
60+
private volatile boolean aborted;
61+
62+
@Override
63+
public boolean isAborted() {
64+
return aborted;
65+
}
66+
67+
@Override
68+
public void abort(String why, Throwable e) {
69+
aborted = true;
70+
LOG.error(why, e);
71+
System.exit(1);
72+
}
73+
});
74+
}
75+
76+
private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst)
77+
throws ReplicationException {
78+
LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(),
79+
dst.getClass().getSimpleName());
80+
for (String peerId : src.listPeerIds()) {
81+
LOG.info("Going to migrate {}", peerId);
82+
ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId);
83+
boolean enabled = src.isPeerEnabled(peerId);
84+
SyncReplicationState syncState = src.getPeerSyncReplicationState(peerId);
85+
SyncReplicationState newSyncState = src.getPeerNewSyncReplicationState(peerId);
86+
if (newSyncState != SyncReplicationState.NONE) {
87+
throw new IllegalStateException("Can not migrate peer " + peerId
88+
+ " as it is in an intermediate state, syncReplicationState is " + syncState
89+
+ " while newSyncReplicationState is " + newSyncState);
90+
}
91+
dst.addPeer(peerId, peerConfig, enabled, syncState);
92+
LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}, syncReplicationState = {}",
93+
peerId, peerConfig, enabled, syncState);
94+
}
95+
}
96+
97+
@Override
98+
public int run(String[] args) throws Exception {
99+
if (args.length != 2) {
100+
System.err.println("Usage: bin/hbase " + NAME
101+
+ " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>");
102+
System.err.println("The possible values for replication storage type:");
103+
for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) {
104+
System.err.println(" " + type.name().toLowerCase());
105+
}
106+
return -1;
107+
}
108+
FileSystem fs = FileSystem.get(getConf());
109+
try (ZKWatcher zk = createZKWatcher()) {
110+
ReplicationPeerStorage src = create(args[0], fs, zk);
111+
ReplicationPeerStorage dst = create(args[1], fs, zk);
112+
migrate(src, dst);
113+
}
114+
return 0;
115+
}
116+
117+
public static void main(String[] args) throws Exception {
118+
Configuration conf = HBaseConfiguration.create();
119+
int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args);
120+
System.exit(ret);
121+
}
122+
}

0 commit comments

Comments
 (0)