Skip to content

Commit c003951

Browse files
committed
HBASE-29380 Add a UT to reproduce
1 parent a71288f commit c003951

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import java.util.concurrent.BrokenBarrierException;
21+
import java.util.concurrent.CyclicBarrier;
22+
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtil;
24+
import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure;
25+
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
26+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
27+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
28+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
29+
import org.apache.hadoop.hbase.testclassification.LargeTests;
30+
import org.apache.hadoop.hbase.testclassification.MasterTests;
31+
import org.junit.AfterClass;
32+
import org.junit.BeforeClass;
33+
import org.junit.ClassRule;
34+
import org.junit.Test;
35+
import org.junit.experimental.categories.Category;
36+
37+
/**
38+
* Testcase for HBASE-29380
39+
*/
40+
@Category({ MasterTests.class, LargeTests.class })
41+
public class TestProcedureWaitAndWake {
42+
43+
@ClassRule
44+
public static final HBaseClassTestRule CLASS_RULE =
45+
HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class);
46+
47+
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
48+
49+
public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> {
50+
51+
private final CyclicBarrier barrier;
52+
53+
public MyPeerProcedure() {
54+
this(null);
55+
}
56+
57+
public MyPeerProcedure(CyclicBarrier barrier) {
58+
super("1");
59+
this.barrier = barrier;
60+
}
61+
62+
@Override
63+
public PeerOperationType getPeerOperationType() {
64+
return PeerOperationType.REMOVE;
65+
}
66+
67+
@Override
68+
protected LockState acquireLock(MasterProcedureEnv env) {
69+
// make sure we have two procedure arrive here at the same time, so one of them will enter the
70+
// lock wait state
71+
try {
72+
barrier.await();
73+
} catch (InterruptedException | BrokenBarrierException e) {
74+
throw new RuntimeException(e);
75+
}
76+
return super.acquireLock(env);
77+
}
78+
79+
@Override
80+
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
81+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
82+
if (state.intValue() == 0) {
83+
setNextState(1);
84+
addChildProcedure(new MySubPeerProcedure());
85+
return Flow.HAS_MORE_STATE;
86+
} else {
87+
Thread.sleep(200);
88+
return Flow.NO_MORE_STATE;
89+
}
90+
}
91+
92+
@Override
93+
protected Integer getState(int stateId) {
94+
return Integer.valueOf(stateId);
95+
}
96+
97+
@Override
98+
protected int getStateId(Integer state) {
99+
return state.intValue();
100+
}
101+
102+
@Override
103+
protected Integer getInitialState() {
104+
return 0;
105+
}
106+
107+
public static final class MySubPeerProcedure extends AbstractPeerNoLockProcedure<Integer> {
108+
109+
public MySubPeerProcedure() {
110+
super("1");
111+
}
112+
113+
@Override
114+
public PeerOperationType getPeerOperationType() {
115+
return PeerOperationType.REFRESH;
116+
}
117+
118+
@Override
119+
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
120+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
121+
return Flow.NO_MORE_STATE;
122+
}
123+
124+
@Override
125+
protected Integer getState(int stateId) {
126+
return Integer.valueOf(stateId);
127+
}
128+
129+
@Override
130+
protected int getStateId(Integer state) {
131+
return state.intValue();
132+
}
133+
134+
@Override
135+
protected Integer getInitialState() {
136+
return 0;
137+
}
138+
}
139+
}
140+
141+
@BeforeClass
142+
public static void setUp() throws Exception {
143+
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
144+
UTIL.startMiniCluster(3);
145+
}
146+
147+
@AfterClass
148+
public static void tearDown() throws Exception {
149+
UTIL.shutdownMiniCluster();
150+
}
151+
152+
@Test
153+
public void testPeerProcedure() {
154+
ProcedureExecutor<MasterProcedureEnv> procExec =
155+
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
156+
CyclicBarrier barrier = new CyclicBarrier(2);
157+
MyPeerProcedure p1 = new MyPeerProcedure(barrier);
158+
MyPeerProcedure p2 = new MyPeerProcedure(barrier);
159+
long id1 = procExec.submitProcedure(p1);
160+
long id2 = procExec.submitProcedure(p2);
161+
UTIL.waitFor(10000, () -> procExec.isFinished(id1));
162+
UTIL.waitFor(10000, () -> procExec.isFinished(id2));
163+
}
164+
}

0 commit comments

Comments
 (0)