Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -746,26 +744,6 @@ public ServerStateNode addRegionToServer(final RegionStateNode regionNode) {
return serverNode;
}

public boolean isReplicaAvailableForRegion(final RegionInfo info) {
// if the region info itself is a replica return true.
if (!RegionReplicaUtil.isDefaultReplica(info)) {
return true;
}
// iterate the regionsMap for the given region name. If there are replicas it should
// list them in order.
for (RegionStateNode node : regionsMap.tailMap(info.getRegionName()).values()) {
if (!node.getTable().equals(info.getTable())
|| !ServerRegionReplicaUtil.isReplicasForSameRegion(info, node.getRegionInfo())) {
break;
} else if (!RegionReplicaUtil.isDefaultReplica(node.getRegionInfo())) {
// we have replicas
return true;
}
}
// we don have replicas
return false;
}

public ServerStateNode removeRegionFromServer(final ServerName serverName,
final RegionStateNode regionNode) {
ServerStateNode serverNode = getOrCreateServer(serverName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -48,12 +49,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
Expand Down Expand Up @@ -1263,7 +1263,7 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

Cluster cluster = createCluster(servers, regions, false);
Cluster cluster = createCluster(servers, regions);
List<RegionInfo> unassignedRegions = new ArrayList<>();

roundRobinAssignment(cluster, regions, unassignedRegions,
Expand Down Expand Up @@ -1319,8 +1319,24 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
boolean hasRegionReplica) {
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
throws HBaseIOException {
boolean hasRegionReplica = false;
try {
if (services != null && services.getTableDescriptors() != null) {
Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
for (RegionInfo regionInfo : regions) {
TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
if (td != null && td.getRegionReplication() > 1) {
hasRegionReplica = true;
break;
}
}
}
} catch (IOException ioe) {
throw new HBaseIOException(ioe);
}

// Get the snapshot of the current assignments for the regions in question, and then create
// a cluster out of it. Note that we might have replicas already assigned to some servers
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
Expand Down Expand Up @@ -1380,7 +1396,7 @@ public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> serve
final List<ServerName> finalServers = idleServers.isEmpty() ?
servers : idleServers;
List<RegionInfo> regions = Lists.newArrayList(regionInfo);
Cluster cluster = createCluster(finalServers, regions, false);
Cluster cluster = createCluster(finalServers, regions);
return randomAssignment(cluster, regionInfo, finalServers);
}

Expand Down Expand Up @@ -1452,21 +1468,9 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server

int numRandomAssignments = 0;
int numRetainedAssigments = 0;
boolean hasRegionReplica = false;
for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
RegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
// In the current set of regions even if one has region replica let us go with
// getting the entire snapshot
if (this.services != null) { // for tests
AssignmentManager am = this.services.getAssignmentManager();
if (am != null) {
RegionStates states = am.getRegionStates();
if (!hasRegionReplica && states != null && states.isReplicaAvailableForRegion(region)) {
hasRegionReplica = true;
}
}
}
List<ServerName> localServers = new ArrayList<>();
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
Expand Down Expand Up @@ -1506,7 +1510,7 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server

// If servers from prior assignment aren't present, then lets do randomAssignment on regions.
if (randomAssignRegions.size() > 0) {
Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
Cluster cluster = createCluster(servers, regions.keySet());
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
ServerName sn = entry.getKey();
for (RegionInfo region : entry.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -43,6 +44,7 @@
import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -58,6 +60,7 @@
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -147,15 +150,7 @@ public void testCreateTableWithMultipleReplicas() throws Exception {

List<RegionInfo> hris = MetaTableAccessor.getTableRegions(ADMIN.getConnection(), tableName);
assertEquals(numRegions * numReplica, hris.size());
// check that the master created expected number of RegionState objects
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < numReplica; j++) {
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionState(replica);
assertNotNull(state);
}
}
assertRegionStateNotNull(hris, numRegions, numReplica);

List<Result> metaRows = MetaTableAccessor.fullScanRegions(ADMIN.getConnection());
int numRows = 0;
Expand Down Expand Up @@ -184,14 +179,26 @@ public void testCreateTableWithMultipleReplicas() throws Exception {
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition();
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < numReplica; j++) {
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionState(replica);
assertNotNull(state);
}
assertRegionStateNotNull(hris, numRegions, numReplica);
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
ADMIN.getConnection());

// Now shut the whole cluster down, and verify the assignments are kept so that the
// availability constraints are met. MiniHBaseCluster chooses arbitrary ports on each
// restart. This messes with our being able to test that we retain locality. Therefore,
// figure current cluster ports and pass them in on next cluster start so new cluster comes
// up at same coordinates -- and the assignment retention logic has a chance to cut in.
List<Integer> rsports = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rst : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()) {
rsports.add(rst.getRegionServer().getRpcServer().getListenerAddress().getPort());
}
TEST_UTIL.shutdownMiniHBaseCluster();
StartMiniClusterOption option =
StartMiniClusterOption.builder().numRegionServers(numSlaves).rsPorts(rsports).build();
TEST_UTIL.startMiniHBaseCluster(option);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition();
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
ADMIN.getConnection());

Expand Down Expand Up @@ -255,6 +262,19 @@ public void testCreateTableWithMultipleReplicas() throws Exception {
}
}

private void assertRegionStateNotNull(List<RegionInfo> hris, int numRegions, int numReplica) {
// check that the master created expected number of RegionState objects
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < numReplica; j++) {
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
RegionState state =
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getRegionState(replica);
assertNotNull(state);
}
}
}

@Test
@Ignore("Enable when we have support for alter_table- HBASE-10361")
public void testIncompleteMetaTableReplicaInformation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand All @@ -29,11 +32,14 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -125,6 +131,7 @@ protected void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doub
long procId = getSCPProcId(procExec);
ProcedureTestingUtility.waitProcedure(procExec, procId);
}
assertReplicaDistributed(t);
assertEquals(count, HBaseTestingUtility.countRows(t));
assertEquals(checksum, util.checksumRows(t));
}
Expand All @@ -135,6 +142,36 @@ protected long getSCPProcId(ProcedureExecutor<?> procExec) {
return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong();
}

private void assertReplicaDistributed(Table t) throws IOException {
if (t.getDescriptor().getRegionReplication() <= 1) {
return;
}
// Assert all data came back.
List<RegionInfo> regionInfos = new ArrayList<>();
for (RegionServerThread rs : this.util.getMiniHBaseCluster().getRegionServerThreads()) {
regionInfos.clear();
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
LOG.info("The region is " + r.getRegionInfo() + " the location is " +
rs.getRegionServer().getServerName());
if (contains(regionInfos, r.getRegionInfo())) {
LOG.error("Am exiting");
fail("Replica regions should be assigned to different region servers");
} else {
regionInfos.add(r.getRegionInfo());
}
}
}
}

private boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
for (RegionInfo info : regionInfos) {
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
return true;
}
}
return false;
}

protected Table createTable(final TableName tableName) throws IOException {
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, getRegionReplication());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand All @@ -37,7 +39,6 @@
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -64,31 +65,27 @@ public class TestRegionReplicasWithRestartScenarios {

private static final int NB_SERVERS = 3;
private Table table;
private TableName tableName;

private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;

@BeforeClass
public static void beforeClass() throws Exception {
// Reduce the hdfs block size and prefetch to trigger the file-link reopen
// when the file is moved to archive (e.g. compaction)
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", 3);
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", NB_SERVERS);
HTU.startMiniCluster(NB_SERVERS);
}

@Before
public void before() throws IOException {
TableName tableName = TableName.valueOf(this.name.getMethodName());
// Create table then get the single region for our new table.
this.table = createTableDirectlyFromHTD(tableName);
this.tableName = TableName.valueOf(this.name.getMethodName());
this.table = createTableDirectlyFromHTD(this.tableName);
}

@After
public void after() throws IOException {
this.table.close();
HTU.deleteTable(this.tableName);
}

private static Table createTableDirectlyFromHTD(final TableName tableName) throws IOException {
Expand Down Expand Up @@ -125,6 +122,20 @@ private HRegionServer getTertiaryRS() {

@Test
public void testRegionReplicasCreated() throws Exception {
assertReplicaDistributed();
}

@Test
public void testWhenRestart() throws Exception {
ServerName serverName = getRS().getServerName();
HTU.getHBaseCluster().stopRegionServer(serverName);
HTU.getHBaseCluster().waitForRegionServerToStop(serverName, 60000);
HTU.getHBaseCluster().startRegionServerAndWait(60000);
HTU.waitTableAvailable(this.tableName);
assertReplicaDistributed();
}

private void assertReplicaDistributed() throws Exception {
Collection<HRegion> onlineRegions = getRS().getOnlineRegionsLocalContext();
boolean res = checkDuplicates(onlineRegions);
assertFalse(res);
Expand All @@ -150,7 +161,7 @@ private boolean checkDuplicates(Collection<HRegion> onlineRegions3) throws Excep
RegionReplicaUtil.getRegionInfoForDefaultReplica(actualRegion.getRegionInfo()))) {
i++;
if (i > 1) {
LOG.info("Duplicate found " + actualRegion.getRegionInfo() + " " +
LOG.warn("Duplicate found {} and {}", actualRegion.getRegionInfo(),
region.getRegionInfo());
assertTrue(Bytes.equals(region.getRegionInfo().getStartKey(),
actualRegion.getRegionInfo().getStartKey()));
Expand Down