Skip to content

Commit 2332a30

Browse files
committed
HBASE-28582 ModifyTableProcedure should not reset TRSP on region node when closing unused region replicas (#5903)
Signed-off-by: Viraj Jasani <vjasani@apache.org> (cherry picked from commit c4a7606)
1 parent 771e760 commit 2332a30

File tree

6 files changed

+335
-10
lines changed

6 files changed

+335
-10
lines changed

hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,3 +612,13 @@ message ModifyStoreFileTrackerStateData {
612612
message ModifyColumnFamilyStoreFileTrackerStateData {
613613
required bytes family = 1;
614614
}
615+
616+
enum CloseExcessRegionReplicasProcedureState {
617+
CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE = 1;
618+
CLOSE_EXCESS_REGION_REPLICAS_CONFIRM = 2;
619+
}
620+
621+
message CloseExcessRegionReplicasProcedureStateData {
622+
required TableName table_name = 1;
623+
required uint32 new_replica_count = 2;
624+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.locks.Condition;
3434
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.function.Consumer;
3536
import java.util.stream.Collectors;
3637
import java.util.stream.Stream;
3738
import org.apache.hadoop.conf.Configuration;
@@ -1016,14 +1017,55 @@ public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableN
10161017
}
10171018

10181019
/**
1019-
* Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1020+
* Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will
1021+
* skip submit unassign procedure if the region is in transition, so you may need to call this
1022+
* method multiple times.
1023+
* @param tableName the table for closing excess region replicas
1024+
* @param newReplicaCount the new replica count, should be less than current replica count
1025+
* @param submit for submitting procedure
1026+
* @return the number of regions in transition that we can not schedule unassign procedures
10201027
*/
1021-
public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1022-
TableName tableName, int newReplicaCount) {
1023-
return regionStates.getTableRegionStateNodes(tableName).stream()
1024-
.filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1025-
.map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1026-
.toArray(TransitRegionStateProcedure[]::new);
1028+
public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName,
1029+
int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) {
1030+
int inTransitionCount = 0;
1031+
for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1032+
regionNode.lock();
1033+
try {
1034+
if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1035+
if (regionNode.isInTransition()) {
1036+
LOG.debug("skip scheduling unassign procedure for {} when closing excess region "
1037+
+ "replicas since it is in transition", regionNode);
1038+
inTransitionCount++;
1039+
continue;
1040+
}
1041+
if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1042+
continue;
1043+
}
1044+
submit.accept(regionNode.setProcedure(TransitRegionStateProcedure
1045+
.unassign(getProcedureEnvironment(), regionNode.getRegionInfo())));
1046+
}
1047+
} finally {
1048+
regionNode.unlock();
1049+
}
1050+
}
1051+
return inTransitionCount;
1052+
}
1053+
1054+
public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) {
1055+
int unclosed = 0;
1056+
for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1057+
regionNode.lock();
1058+
try {
1059+
if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1060+
if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1061+
unclosed++;
1062+
}
1063+
}
1064+
} finally {
1065+
regionNode.unlock();
1066+
}
1067+
}
1068+
return unclosed;
10271069
}
10281070

10291071
public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import java.io.IOException;
21+
import org.apache.commons.lang3.mutable.MutableBoolean;
22+
import org.apache.hadoop.hbase.TableName;
23+
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
24+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
25+
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
26+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
27+
import org.apache.hadoop.hbase.util.RetryCounter;
28+
import org.apache.yetus.audience.InterfaceAudience;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
33+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureState;
34+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureStateData;
35+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
36+
37+
/**
38+
* Procedure for close excess region replicas.
39+
*/
40+
@InterfaceAudience.Private
41+
public class CloseExcessRegionReplicasProcedure
42+
extends AbstractStateMachineTableProcedure<CloseExcessRegionReplicasProcedureState> {
43+
44+
private static final Logger LOG =
45+
LoggerFactory.getLogger(CloseExcessRegionReplicasProcedure.class);
46+
47+
private TableName tableName;
48+
private int newReplicaCount;
49+
50+
private RetryCounter retryCounter;
51+
52+
public CloseExcessRegionReplicasProcedure() {
53+
}
54+
55+
public CloseExcessRegionReplicasProcedure(TableName tableName, int newReplicaCount) {
56+
this.tableName = tableName;
57+
this.newReplicaCount = newReplicaCount;
58+
}
59+
60+
@Override
61+
public TableName getTableName() {
62+
return tableName;
63+
}
64+
65+
@Override
66+
public TableOperationType getTableOperationType() {
67+
return TableOperationType.REGION_EDIT;
68+
}
69+
70+
@Override
71+
protected Flow executeFromState(MasterProcedureEnv env,
72+
CloseExcessRegionReplicasProcedureState state)
73+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
74+
LOG.trace("{} execute state={}", this, state);
75+
switch (state) {
76+
case CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE:
77+
MutableBoolean submitted = new MutableBoolean(false);
78+
int inTransitionCount = env.getAssignmentManager()
79+
.submitUnassignProcedureForClosingExcessRegionReplicas(tableName, newReplicaCount, p -> {
80+
submitted.setTrue();
81+
addChildProcedure(p);
82+
});
83+
if (inTransitionCount > 0 && submitted.isFalse()) {
84+
// we haven't scheduled any unassign procedures and there are still regions in
85+
// transition, sleep for a while and try again
86+
if (retryCounter == null) {
87+
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
88+
}
89+
long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
90+
LOG.info(
91+
"There are still {} region(s) in transition for table {} when closing excess"
92+
+ " region replicas, suspend {}secs and try again later",
93+
inTransitionCount, tableName, backoffMillis / 1000);
94+
suspend((int) backoffMillis, true);
95+
}
96+
setNextState(CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_CONFIRM);
97+
return Flow.HAS_MORE_STATE;
98+
case CLOSE_EXCESS_REGION_REPLICAS_CONFIRM:
99+
int unclosedCount = env.getAssignmentManager()
100+
.numberOfUnclosedExcessRegionReplicas(tableName, newReplicaCount);
101+
if (unclosedCount > 0) {
102+
LOG.info("There are still {} unclosed region(s) for table {} when closing excess"
103+
+ " region replicas, continue...");
104+
setNextState(
105+
CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE);
106+
return Flow.HAS_MORE_STATE;
107+
} else {
108+
return Flow.NO_MORE_STATE;
109+
}
110+
default:
111+
throw new UnsupportedOperationException("unhandled state=" + state);
112+
}
113+
}
114+
115+
@Override
116+
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
117+
setState(ProcedureProtos.ProcedureState.RUNNABLE);
118+
env.getProcedureScheduler().addFront(this);
119+
return false;
120+
}
121+
122+
@Override
123+
protected void rollbackState(MasterProcedureEnv env,
124+
CloseExcessRegionReplicasProcedureState state) throws IOException, InterruptedException {
125+
throw new UnsupportedOperationException();
126+
}
127+
128+
@Override
129+
protected CloseExcessRegionReplicasProcedureState getState(int stateId) {
130+
return CloseExcessRegionReplicasProcedureState.forNumber(stateId);
131+
}
132+
133+
@Override
134+
protected int getStateId(CloseExcessRegionReplicasProcedureState state) {
135+
return state.getNumber();
136+
}
137+
138+
@Override
139+
protected CloseExcessRegionReplicasProcedureState getInitialState() {
140+
return CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE;
141+
}
142+
143+
@Override
144+
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
145+
CloseExcessRegionReplicasProcedureStateData data = CloseExcessRegionReplicasProcedureStateData
146+
.newBuilder().setTableName(ProtobufUtil.toProtoTableName(tableName))
147+
.setNewReplicaCount(newReplicaCount).build();
148+
serializer.serialize(data);
149+
}
150+
151+
@Override
152+
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
153+
CloseExcessRegionReplicasProcedureStateData data =
154+
serializer.deserialize(CloseExcessRegionReplicasProcedureStateData.class);
155+
tableName = ProtobufUtil.toTableName(data.getTableName());
156+
newReplicaCount = data.getNewReplicaCount();
157+
}
158+
159+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,7 @@ private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
495495
if (newReplicaCount >= oldReplicaCount) {
496496
return;
497497
}
498-
addChildProcedure(env.getAssignmentManager()
499-
.createUnassignProceduresForClosingExcessRegionReplicas(getTableName(), newReplicaCount));
498+
addChildProcedure(new CloseExcessRegionReplicasProcedure(getTableName(), newReplicaCount));
500499
}
501500

502501
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.assignment;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.HBaseTestingUtility;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.AsyncConnection;
29+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
30+
import org.apache.hadoop.hbase.client.ConnectionFactory;
31+
import org.apache.hadoop.hbase.client.TableDescriptor;
32+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
33+
import org.apache.hadoop.hbase.master.RegionState;
34+
import org.apache.hadoop.hbase.master.procedure.CloseExcessRegionReplicasProcedure;
35+
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
36+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
37+
import org.apache.hadoop.hbase.testclassification.MasterTests;
38+
import org.apache.hadoop.hbase.testclassification.MediumTests;
39+
import org.junit.AfterClass;
40+
import org.junit.BeforeClass;
41+
import org.junit.ClassRule;
42+
import org.junit.Test;
43+
import org.junit.experimental.categories.Category;
44+
45+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
46+
47+
/**
48+
* A test to make sure that we will wait for RIT to finish while closing excess region replicas. See
49+
* HBASE-28582 and related issues for more details.
50+
*/
51+
@Category({ MasterTests.class, MediumTests.class })
52+
public class TestReduceExcessRegionReplicasBlockedByRIT {
53+
54+
@ClassRule
55+
public static final HBaseClassTestRule CLASS_RULE =
56+
HBaseClassTestRule.forClass(TestReduceExcessRegionReplicasBlockedByRIT.class);
57+
58+
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
59+
60+
private static AsyncConnection CONN;
61+
62+
private static TableDescriptor TD =
63+
TableDescriptorBuilder.newBuilder(TableName.valueOf("CloseExcessRegionReplicas"))
64+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("family")).setRegionReplication(4).build();
65+
66+
@BeforeClass
67+
public static void setUp() throws Exception {
68+
UTIL.startMiniCluster(1);
69+
UTIL.getAdmin().createTable(TD);
70+
UTIL.waitTableAvailable(TD.getTableName());
71+
UTIL.waitUntilNoRegionsInTransition();
72+
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
73+
}
74+
75+
@AfterClass
76+
public static void tearDown() throws Exception {
77+
Closeables.close(CONN, true);
78+
UTIL.shutdownMiniCluster();
79+
}
80+
81+
@Test
82+
public void testRIT() throws Exception {
83+
RegionStateNode rsn = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager()
84+
.getRegionStates().getTableRegionStateNodes(TD.getTableName()).stream()
85+
.filter(rn -> rn.getRegionInfo().getReplicaId() > 1).findAny().get();
86+
// fake a TRSP to block the CloseExcessRegionReplicasProcedure
87+
TransitRegionStateProcedure trsp = new TransitRegionStateProcedure();
88+
rsn.setProcedure(trsp);
89+
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(TD).setRegionReplication(2).build();
90+
CompletableFuture<Void> future = CONN.getAdmin().modifyTable(newTd);
91+
ProcedureExecutor<MasterProcedureEnv> procExec =
92+
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
93+
UTIL.waitFor(5000, () -> procExec.getProcedures().stream()
94+
.anyMatch(p -> p instanceof CloseExcessRegionReplicasProcedure && !p.isFinished()));
95+
CloseExcessRegionReplicasProcedure proc =
96+
procExec.getProcedures().stream().filter(p -> p instanceof CloseExcessRegionReplicasProcedure)
97+
.map(p -> (CloseExcessRegionReplicasProcedure) p).findFirst().get();
98+
// make sure that the procedure can not finish
99+
for (int i = 0; i < 5; i++) {
100+
Thread.sleep(3000);
101+
assertFalse(proc.isFinished());
102+
}
103+
assertTrue(rsn.isInState(RegionState.State.OPEN));
104+
// unset the procedure, so we could make progress on CloseExcessRegionReplicasProcedure
105+
rsn.unsetProcedure(trsp);
106+
UTIL.waitFor(60000, () -> proc.isFinished());
107+
108+
future.get();
109+
110+
// the region should be in CLOSED state, and should have been removed from AM
111+
assertTrue(rsn.isInState(RegionState.State.CLOSED));
112+
// only 2 replicas now
113+
assertEquals(2, UTIL.getMiniHBaseCluster().getRegions(TD.getTableName()).size());
114+
}
115+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicasWithModifyTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public void testRegionReplicasByEnableTableWhenReplicaCountIsDecreasedWithMultip
157157
}
158158

159159
@Test
160-
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithmultipleRegions()
160+
public void testRegionReplicasByEnableTableWhenReplicaCountIsIncreasedWithMultipleRegions()
161161
throws Exception {
162162
enableReplicationByModification(true, 2, 3, 15);
163163
}

0 commit comments

Comments
 (0)