Skip to content

Commit 3204c46

Browse files
committed
HBASE-29380 Two concurrent remove peer requests may hang (#7077) (#7093)
(cherry picked from commit 7cc2f54) Signed-off-by: Nihal Jain <nihaljain@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> (cherry picked from commit 368386d)
1 parent 551ff27 commit 3204c46

File tree

2 files changed

+240
-68
lines changed

2 files changed

+240
-68
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase.master.procedure;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.io.IOException;
2122
import java.util.Arrays;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Optional;
27+
import java.util.function.Consumer;
2628
import java.util.function.Function;
2729
import java.util.function.Supplier;
2830
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -418,7 +420,9 @@ private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fa
418420
// ============================================================================
419421
private TableQueue getTableQueue(TableName tableName) {
420422
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
421-
if (node != null) return node;
423+
if (node != null) {
424+
return node;
425+
}
422426

423427
node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
424428
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
@@ -431,6 +435,21 @@ private void removeTableQueue(TableName tableName) {
431435
locking.removeTableLock(tableName);
432436
}
433437

438+
/**
439+
* Tries to remove the queue and the table-lock of the specified table. If there are new
440+
* operations pending (e.g. a new create), the remove will not be performed.
441+
* @param table the name of the table that should be marked as deleted
442+
* @param procedure the procedure that is removing the table
443+
* @return true if deletion succeeded, false otherwise meaning that there are other new operations
444+
* pending for that table (e.g. a new create).
445+
*/
446+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
447+
allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)")
448+
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
449+
return tryCleanupQueue(table, procedure, () -> tableMap, TABLE_QUEUE_KEY_COMPARATOR,
450+
locking::getTableLock, tableRunQueue, this::removeTableQueue);
451+
}
452+
434453
private static boolean isTableProcedure(Procedure<?> proc) {
435454
return proc instanceof TableProcedureInterface;
436455
}
@@ -467,23 +486,10 @@ private void removeServerQueue(ServerName serverName) {
467486
}
468487

469488
private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
470-
schedLock();
471-
try {
472-
int index = getBucketIndex(serverBuckets, serverName.hashCode());
473-
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
474-
if (node == null) {
475-
return;
476-
}
477-
478-
LockAndQueue lock = locking.getServerLock(serverName);
479-
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
480-
removeFromRunQueue(serverRunQueue, node,
481-
() -> "clean up server queue after " + proc + " completed");
482-
removeServerQueue(serverName);
483-
}
484-
} finally {
485-
schedUnlock();
486-
}
489+
// serverBuckets
490+
tryCleanupQueue(serverName, proc,
491+
() -> serverBuckets[getBucketIndex(serverBuckets, serverName.hashCode())],
492+
SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue, this::removeServerQueue);
487493
}
488494

489495
private static int getBucketIndex(Object[] buckets, int hashCode) {
@@ -516,23 +522,9 @@ private void removePeerQueue(String peerId) {
516522
locking.removePeerLock(peerId);
517523
}
518524

519-
private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
520-
schedLock();
521-
try {
522-
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
523-
if (queue == null) {
524-
return;
525-
}
526-
527-
final LockAndQueue lock = locking.getPeerLock(peerId);
528-
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
529-
removeFromRunQueue(peerRunQueue, queue,
530-
() -> "clean up peer queue after " + procedure + " completed");
531-
removePeerQueue(peerId);
532-
}
533-
} finally {
534-
schedUnlock();
535-
}
525+
private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
526+
tryCleanupQueue(peerId, procedure, () -> peerMap, PEER_QUEUE_KEY_COMPARATOR,
527+
locking::getPeerLock, peerRunQueue, this::removePeerQueue);
536528
}
537529

538530
private static boolean isPeerProcedure(Procedure<?> proc) {
@@ -560,6 +552,35 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
560552
return proc instanceof MetaProcedureInterface;
561553
}
562554

555+
private <T extends Comparable<T>, TNode extends Queue<T>> boolean tryCleanupQueue(T id,
556+
Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode> comparator,
557+
Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T> removeQueue) {
558+
schedLock();
559+
try {
560+
Queue<T> queue = AvlTree.get(getMap.get(), id, comparator);
561+
if (queue == null) {
562+
return true;
563+
}
564+
565+
LockAndQueue lock = getLock.apply(id);
566+
if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) {
567+
// 1. the queue is empty
568+
// 2. no procedure is in the lock's waiting queue
569+
// 3. no other one holds the lock. It is possible that someone else holds the lock, usually
570+
// our parent procedures
571+
// If we can meet all the above conditions, it is safe for us to remove the queue
572+
removeFromRunQueue(runQueue, queue, () -> "clean up queue after " + proc + " completed");
573+
removeQueue.accept(id);
574+
return true;
575+
} else {
576+
return false;
577+
}
578+
579+
} finally {
580+
schedUnlock();
581+
}
582+
}
583+
563584
// ============================================================================
564585
// Table Locking Helpers
565586
// ============================================================================
@@ -699,39 +720,6 @@ public void wakeTableSharedLock(final Procedure<?> procedure, final TableName ta
699720
}
700721
}
701722

702-
/**
703-
* Tries to remove the queue and the table-lock of the specified table. If there are new
704-
* operations pending (e.g. a new create), the remove will not be performed.
705-
* @param table the name of the table that should be marked as deleted
706-
* @param procedure the procedure that is removing the table
707-
* @return true if deletion succeeded, false otherwise meaning that there are other new operations
708-
* pending for that table (e.g. a new create).
709-
*/
710-
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
711-
schedLock();
712-
try {
713-
final TableQueue queue = getTableQueue(table);
714-
final LockAndQueue tableLock = locking.getTableLock(table);
715-
if (queue == null) {
716-
return true;
717-
}
718-
719-
if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
720-
// remove the table from the run-queue and the map
721-
if (AvlIterableList.isLinked(queue)) {
722-
tableRunQueue.remove(queue);
723-
}
724-
removeTableQueue(table);
725-
} else {
726-
// TODO: If there are no create, we can drop all the other ops
727-
return false;
728-
}
729-
} finally {
730-
schedUnlock();
731-
}
732-
return true;
733-
}
734-
735723
// ============================================================================
736724
// Region Locking Helpers
737725
// ============================================================================
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.BrokenBarrierException;
22+
import java.util.concurrent.CyclicBarrier;
23+
import org.apache.hadoop.hbase.HBaseClassTestRule;
24+
import org.apache.hadoop.hbase.HBaseTestingUtility;
25+
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
26+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
27+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
28+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
29+
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
30+
import org.apache.hadoop.hbase.testclassification.LargeTests;
31+
import org.apache.hadoop.hbase.testclassification.MasterTests;
32+
import org.junit.AfterClass;
33+
import org.junit.BeforeClass;
34+
import org.junit.ClassRule;
35+
import org.junit.Test;
36+
import org.junit.experimental.categories.Category;
37+
38+
/**
39+
* Testcase for HBASE-29380
40+
*/
41+
@Category({ MasterTests.class, LargeTests.class })
42+
public class TestProcedureWaitAndWake {
43+
44+
@ClassRule
45+
public static final HBaseClassTestRule CLASS_RULE =
46+
HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class);
47+
48+
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
49+
50+
public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> {
51+
52+
private final CyclicBarrier barrier;
53+
54+
private boolean passedBarrier;
55+
56+
public MyPeerProcedure() {
57+
this(null);
58+
}
59+
60+
public MyPeerProcedure(CyclicBarrier barrier) {
61+
super("1");
62+
this.barrier = barrier;
63+
}
64+
65+
@Override
66+
public PeerOperationType getPeerOperationType() {
67+
return PeerOperationType.REMOVE;
68+
}
69+
70+
@Override
71+
protected LockState acquireLock(MasterProcedureEnv env) {
72+
// make sure we have two procedure arrive here at the same time, so one of them will enter the
73+
// lock wait state
74+
if (!passedBarrier) {
75+
try {
76+
barrier.await();
77+
} catch (InterruptedException | BrokenBarrierException e) {
78+
throw new RuntimeException(e);
79+
}
80+
passedBarrier = true;
81+
}
82+
return super.acquireLock(env);
83+
}
84+
85+
@Override
86+
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
87+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
88+
if (state.intValue() == 0) {
89+
setNextState(1);
90+
addChildProcedure(new MySubPeerProcedure());
91+
return Flow.HAS_MORE_STATE;
92+
} else {
93+
Thread.sleep(200);
94+
return Flow.NO_MORE_STATE;
95+
}
96+
}
97+
98+
@Override
99+
protected Integer getState(int stateId) {
100+
return Integer.valueOf(stateId);
101+
}
102+
103+
@Override
104+
protected int getStateId(Integer state) {
105+
return state.intValue();
106+
}
107+
108+
@Override
109+
protected Integer getInitialState() {
110+
return 0;
111+
}
112+
113+
@Override
114+
protected void rollbackState(MasterProcedureEnv env, Integer state)
115+
throws IOException, InterruptedException {
116+
throw new UnsupportedOperationException();
117+
}
118+
119+
public static final class MySubPeerProcedure
120+
extends StateMachineProcedure<MasterProcedureEnv, Integer> implements PeerProcedureInterface {
121+
122+
@Override
123+
public PeerOperationType getPeerOperationType() {
124+
return PeerOperationType.REFRESH;
125+
}
126+
127+
@Override
128+
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
129+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
130+
return Flow.NO_MORE_STATE;
131+
}
132+
133+
@Override
134+
protected Integer getState(int stateId) {
135+
return Integer.valueOf(stateId);
136+
}
137+
138+
@Override
139+
protected int getStateId(Integer state) {
140+
return state.intValue();
141+
}
142+
143+
@Override
144+
protected Integer getInitialState() {
145+
return 0;
146+
}
147+
148+
@Override
149+
public String getPeerId() {
150+
return "1";
151+
}
152+
153+
@Override
154+
protected void rollbackState(MasterProcedureEnv env, Integer state)
155+
throws IOException, InterruptedException {
156+
throw new UnsupportedOperationException();
157+
}
158+
}
159+
}
160+
161+
@BeforeClass
162+
public static void setUp() throws Exception {
163+
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
164+
UTIL.startMiniCluster(3);
165+
}
166+
167+
@AfterClass
168+
public static void tearDown() throws Exception {
169+
UTIL.shutdownMiniCluster();
170+
}
171+
172+
@Test
173+
public void testPeerProcedure() {
174+
ProcedureExecutor<MasterProcedureEnv> procExec =
175+
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
176+
CyclicBarrier barrier = new CyclicBarrier(2);
177+
MyPeerProcedure p1 = new MyPeerProcedure(barrier);
178+
MyPeerProcedure p2 = new MyPeerProcedure(barrier);
179+
long id1 = procExec.submitProcedure(p1);
180+
long id2 = procExec.submitProcedure(p2);
181+
UTIL.waitFor(10000, () -> procExec.isFinished(id1));
182+
UTIL.waitFor(10000, () -> procExec.isFinished(id2));
183+
}
184+
}

0 commit comments

Comments
 (0)