Skip to content

Commit e88daed

Browse files
authored
HBASE-28210 There could be holes in stack ids when loading procedures (#5531)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 9631af7 commit e88daed

File tree

2 files changed

+234
-4
lines changed

2 files changed

+234
-4
lines changed

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1695,9 +1695,6 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
16951695
}
16961696
}
16971697

1698-
// Add the procedure to the stack
1699-
procStack.addRollbackStep(procedure);
1700-
17011698
// allows to kill the executor before something is stored to the wal.
17021699
// useful to test the procedure recovery.
17031700
if (
@@ -1715,7 +1712,12 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
17151712
// Commit the transaction even if a suspend (state may have changed). Note this append
17161713
// can take a bunch of time to complete.
17171714
if (procedure.needPersistence()) {
1718-
updateStoreOnExec(procStack, procedure, subprocs);
1715+
// Add the procedure to the stack
1716+
// See HBASE-28210 on why we need synchronized here
1717+
synchronized (procStack) {
1718+
procStack.addRollbackStep(procedure);
1719+
updateStoreOnExec(procStack, procedure, subprocs);
1720+
}
17191721
}
17201722

17211723
// if the store is not running we are aborting
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.procedure2;
19+
20+
import java.io.IOException;
21+
import java.io.UncheckedIOException;
22+
import java.util.LinkedHashMap;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
27+
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
28+
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
29+
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
30+
import org.apache.hadoop.hbase.testclassification.MasterTests;
31+
import org.apache.hadoop.hbase.testclassification.SmallTests;
32+
import org.apache.hadoop.hbase.util.AtomicUtils;
33+
import org.junit.After;
34+
import org.junit.Before;
35+
import org.junit.ClassRule;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
39+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
40+
41+
/**
42+
* Testcase for HBASE-28210, where we persist the procedure which has been inserted later to
43+
* {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading,
44+
* and finally fail the start up of master.
45+
*/
46+
@Category({ MasterTests.class, SmallTests.class })
47+
public class TestStackIdHoles {
48+
49+
@ClassRule
50+
public static final HBaseClassTestRule CLASS_RULE =
51+
HBaseClassTestRule.forClass(TestStackIdHoles.class);
52+
53+
private final class DummyProcedureStore extends ProcedureStoreBase {
54+
55+
private int numThreads;
56+
57+
private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap =
58+
new LinkedHashMap<Long, ProcedureProtos.Procedure>();
59+
60+
private final AtomicLong maxProcId = new AtomicLong(0);
61+
62+
private final AtomicBoolean updated = new AtomicBoolean(false);
63+
64+
@Override
65+
public void start(int numThreads) throws IOException {
66+
this.numThreads = numThreads;
67+
setRunning(true);
68+
}
69+
70+
@Override
71+
public void stop(boolean abort) {
72+
}
73+
74+
@Override
75+
public int getNumThreads() {
76+
return numThreads;
77+
}
78+
79+
@Override
80+
public int setRunningProcedureCount(int count) {
81+
return count;
82+
}
83+
84+
@Override
85+
public void recoverLease() throws IOException {
86+
}
87+
88+
@Override
89+
public void load(ProcedureLoader loader) throws IOException {
90+
loader.setMaxProcId(maxProcId.get());
91+
ProcedureTree tree = ProcedureTree.build(procMap.values());
92+
loader.load(tree.getValidProcs());
93+
loader.handleCorrupted(tree.getCorruptedProcs());
94+
}
95+
96+
@Override
97+
public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
98+
long max = proc.getProcId();
99+
synchronized (procMap) {
100+
try {
101+
procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
102+
if (subprocs != null) {
103+
for (Procedure<?> p : subprocs) {
104+
procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
105+
max = Math.max(max, p.getProcId());
106+
}
107+
}
108+
} catch (IOException e) {
109+
throw new UncheckedIOException(e);
110+
}
111+
}
112+
AtomicUtils.updateMax(maxProcId, max);
113+
}
114+
115+
@Override
116+
public void insert(Procedure<?>[] procs) {
117+
long max = -1;
118+
synchronized (procMap) {
119+
try {
120+
for (Procedure<?> p : procs) {
121+
procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
122+
max = Math.max(max, p.getProcId());
123+
}
124+
} catch (IOException e) {
125+
throw new UncheckedIOException(e);
126+
}
127+
}
128+
AtomicUtils.updateMax(maxProcId, max);
129+
}
130+
131+
@Override
132+
public void update(Procedure<?> proc) {
133+
// inject a sleep to simulate the scenario in HBASE-28210
134+
if (proc.hasParent() && proc.getStackIndexes() != null) {
135+
int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1];
136+
try {
137+
// sleep more times if the stack id is smaller
138+
Thread.sleep(100L * (10 - lastStackId));
139+
} catch (InterruptedException e) {
140+
Thread.currentThread().interrupt();
141+
return;
142+
}
143+
// simulate the failure when updating the second sub procedure
144+
if (!updated.compareAndSet(false, true)) {
145+
procExec.stop();
146+
throw new RuntimeException("inject error");
147+
}
148+
}
149+
synchronized (procMap) {
150+
try {
151+
procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
152+
} catch (IOException e) {
153+
throw new UncheckedIOException(e);
154+
}
155+
}
156+
}
157+
158+
@Override
159+
public void delete(long procId) {
160+
synchronized (procMap) {
161+
procMap.remove(procId);
162+
}
163+
}
164+
165+
@Override
166+
public void delete(Procedure<?> parentProc, long[] subProcIds) {
167+
synchronized (procMap) {
168+
try {
169+
procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc));
170+
for (long procId : subProcIds) {
171+
procMap.remove(procId);
172+
}
173+
} catch (IOException e) {
174+
throw new UncheckedIOException(e);
175+
}
176+
}
177+
}
178+
179+
@Override
180+
public void delete(long[] procIds, int offset, int count) {
181+
synchronized (procMap) {
182+
for (int i = 0; i < count; i++) {
183+
long procId = procIds[offset + i];
184+
procMap.remove(procId);
185+
}
186+
}
187+
}
188+
}
189+
190+
private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil();
191+
192+
private DummyProcedureStore procStore;
193+
194+
private ProcedureExecutor<Void> procExec;
195+
196+
@Before
197+
public void setUp() throws IOException {
198+
procStore = new DummyProcedureStore();
199+
procStore.start(4);
200+
procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
201+
procExec.init(4, true);
202+
procExec.startWorkers();
203+
}
204+
205+
@After
206+
public void tearDown() {
207+
procExec.stop();
208+
}
209+
210+
public static class DummyProcedure extends NoopProcedure<Void> {
211+
212+
@Override
213+
protected Procedure<Void>[] execute(Void env)
214+
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
215+
return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() };
216+
}
217+
}
218+
219+
@Test
220+
public void testLoad() throws IOException {
221+
procExec.submitProcedure(new DummyProcedure());
222+
// wait for the error
223+
HBTU.waitFor(30000, () -> !procExec.isRunning());
224+
procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
225+
// make sure there is no error while loading
226+
procExec.init(4, true);
227+
}
228+
}

0 commit comments

Comments
 (0)