Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/hbase
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ show_usage() {
echo " pre-upgrade Run Pre-Upgrade validator tool"
echo " hbtop Run HBTop tool"
echo " credential Run the Hadoop Credential Shell"
echo " copyreppeers Run CopyReplicationPeers tool"
echo " CLASSNAME Run the class named CLASSNAME"
}

Expand Down Expand Up @@ -769,6 +770,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then
HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}"
elif [ "$COMMAND" = "credential" ] ; then
CLASS='org.apache.hadoop.security.alias.CredentialShell'
elif [ "$COMMAND" = "copyreppeers" ] ; then
CLASS='org.apache.hadoop.hbase.replication.ReplicationPeerMigrationTool'
else
CLASS=$COMMAND
if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;

/**
* A helper tool for generating random {@link ReplicationPeerConfig} and do assertion.
*/
public final class ReplicationPeerConfigTestUtil {

// Seed may be set with Random#setSeed
private static final Random RNG = new Random();

private ReplicationPeerConfigTestUtil() {
}

private static Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}

private static Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}

public static ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
}

private static void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}

private static void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(),
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}

public static void assertConfigEquals(ReplicationPeerConfig expected,
ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@
*/
package org.apache.hadoop.hbase.replication;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hbase.replication.ReplicationPeerConfigTestUtil.getConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand Down Expand Up @@ -260,35 +255,6 @@ public void testNeedToReplicateCFWithoutReplicatingAll() {
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}

private static final Random RNG = new Random(); // Seed may be set with Random#setSeed

private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}

private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}

private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
}

@Test
public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
Expand Down
6 changes: 6 additions & 0 deletions hbase-replication/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A tool for copying replication peer data across different replication peer storages.
* <p/>
* Notice that we will not delete the replication peer data from the source storage, as this tool
* can also be used by online migration. See HBASE-27110 for the whole design.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class CopyReplicationPeers extends Configured implements Tool {

private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class);

public static final String NAME = "copyreppeers";

public CopyReplicationPeers(Configuration conf) {
super(conf);
}

private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) {
Configuration conf = new Configuration(getConf());
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type);
return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
}

private ZKWatcher createZKWatcher() throws IOException {
return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() {

private volatile boolean aborted;

@Override
public boolean isAborted() {
return aborted;
}

@Override
public void abort(String why, Throwable e) {
aborted = true;
LOG.error(why, e);
System.exit(1);
}
});
}

private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst)
throws ReplicationException {
LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(),
dst.getClass().getSimpleName());
for (String peerId : src.listPeerIds()) {
LOG.info("Going to migrate {}", peerId);
ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId);
boolean enabled = src.isPeerEnabled(peerId);
SyncReplicationState syncState = src.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncState = src.getPeerNewSyncReplicationState(peerId);
if (newSyncState != SyncReplicationState.NONE) {
throw new IllegalStateException("Can not migrate peer " + peerId
+ " as it is in an intermediate state, syncReplicationState is " + syncState
+ " while newSyncReplicationState is " + newSyncState);
}
dst.addPeer(peerId, peerConfig, enabled, syncState);
LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}, syncReplicationState = {}",
peerId, peerConfig, enabled, syncState);
}
}

@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: bin/hbase " + NAME
+ " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>");
System.err.println("The possible values for replication storage type:");
for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) {
System.err.println(" " + type.name().toLowerCase());
}
return -1;
}
FileSystem fs = FileSystem.get(getConf());
try (ZKWatcher zk = createZKWatcher()) {
ReplicationPeerStorage src = create(args[0], fs, zk);
ReplicationPeerStorage dst = create(args[1], fs, zk);
migrate(src, dst);
}
return 0;
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args);
System.exit(ret);
}
}
Loading