Skip to content

HBASE-29380 Two concurrent remove peer requests may hang (#7077) #7093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.master.procedure;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -418,7 +420,9 @@ private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fa
// ============================================================================
private TableQueue getTableQueue(TableName tableName) {
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
if (node != null) return node;
if (node != null) {
return node;
}

node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
Expand All @@ -431,6 +435,21 @@ private void removeTableQueue(TableName tableName) {
locking.removeTableLock(tableName);
}

/**
* Tries to remove the queue and the table-lock of the specified table. If there are new
* operations pending (e.g. a new create), the remove will not be performed.
* @param table the name of the table that should be marked as deleted
* @param procedure the procedure that is removing the table
* @return true if deletion succeeded, false otherwise meaning that there are other new operations
* pending for that table (e.g. a new create).
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)")
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
return tryCleanupQueue(table, procedure, () -> tableMap, TABLE_QUEUE_KEY_COMPARATOR,
locking::getTableLock, tableRunQueue, this::removeTableQueue);
}

private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
Expand Down Expand Up @@ -467,23 +486,10 @@ private void removeServerQueue(ServerName serverName) {
}

private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
schedLock();
try {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node == null) {
return;
}

LockAndQueue lock = locking.getServerLock(serverName);
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
removeFromRunQueue(serverRunQueue, node,
() -> "clean up server queue after " + proc + " completed");
removeServerQueue(serverName);
}
} finally {
schedUnlock();
}
// serverBuckets
tryCleanupQueue(serverName, proc,
() -> serverBuckets[getBucketIndex(serverBuckets, serverName.hashCode())],
SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue, this::removeServerQueue);
}

private static int getBucketIndex(Object[] buckets, int hashCode) {
Expand Down Expand Up @@ -516,23 +522,9 @@ private void removePeerQueue(String peerId) {
locking.removePeerLock(peerId);
}

private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
schedLock();
try {
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}

final LockAndQueue lock = locking.getPeerLock(peerId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, queue,
() -> "clean up peer queue after " + procedure + " completed");
removePeerQueue(peerId);
}
} finally {
schedUnlock();
}
private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
tryCleanupQueue(peerId, procedure, () -> peerMap, PEER_QUEUE_KEY_COMPARATOR,
locking::getPeerLock, peerRunQueue, this::removePeerQueue);
}

private static boolean isPeerProcedure(Procedure<?> proc) {
Expand Down Expand Up @@ -560,6 +552,35 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
return proc instanceof MetaProcedureInterface;
}

private <T extends Comparable<T>, TNode extends Queue<T>> boolean tryCleanupQueue(T id,
Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode> comparator,
Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T> removeQueue) {
schedLock();
try {
Queue<T> queue = AvlTree.get(getMap.get(), id, comparator);
if (queue == null) {
return true;
}

LockAndQueue lock = getLock.apply(id);
if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) {
// 1. the queue is empty
// 2. no procedure is in the lock's waiting queue
// 3. no other one holds the lock. It is possible that someone else holds the lock, usually
// our parent procedures
// If we can meet all the above conditions, it is safe for us to remove the queue
removeFromRunQueue(runQueue, queue, () -> "clean up queue after " + proc + " completed");
removeQueue.accept(id);
return true;
} else {
return false;
}

} finally {
schedUnlock();
}
}

// ============================================================================
// Table Locking Helpers
// ============================================================================
Expand Down Expand Up @@ -699,39 +720,6 @@ public void wakeTableSharedLock(final Procedure<?> procedure, final TableName ta
}
}

/**
* Tries to remove the queue and the table-lock of the specified table. If there are new
* operations pending (e.g. a new create), the remove will not be performed.
* @param table the name of the table that should be marked as deleted
* @param procedure the procedure that is removing the table
* @return true if deletion succeeded, false otherwise meaning that there are other new operations
* pending for that table (e.g. a new create).
*/
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
final LockAndQueue tableLock = locking.getTableLock(table);
if (queue == null) {
return true;
}

if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
// remove the table from the run-queue and the map
if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue);
}
removeTableQueue(table);
} else {
// TODO: If there are no create, we can drop all the other ops
return false;
}
} finally {
schedUnlock();
}
return true;
}

// ============================================================================
// Region Locking Helpers
// ============================================================================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/**
* Testcase for HBASE-29380
*/
@Category({ MasterTests.class, LargeTests.class })
public class TestProcedureWaitAndWake {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class);

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> {

private final CyclicBarrier barrier;

private boolean passedBarrier;

public MyPeerProcedure() {
this(null);
}

public MyPeerProcedure(CyclicBarrier barrier) {
super("1");
this.barrier = barrier;
}

@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.REMOVE;
}

@Override
protected LockState acquireLock(MasterProcedureEnv env) {
// make sure we have two procedure arrive here at the same time, so one of them will enter the
// lock wait state
if (!passedBarrier) {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
passedBarrier = true;
}
return super.acquireLock(env);
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
if (state.intValue() == 0) {
setNextState(1);
addChildProcedure(new MySubPeerProcedure());
return Flow.HAS_MORE_STATE;
} else {
Thread.sleep(200);
return Flow.NO_MORE_STATE;
}
}

@Override
protected Integer getState(int stateId) {
return Integer.valueOf(stateId);
}

@Override
protected int getStateId(Integer state) {
return state.intValue();
}

@Override
protected Integer getInitialState() {
return 0;
}

@Override
protected void rollbackState(MasterProcedureEnv env, Integer state)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

public static final class MySubPeerProcedure
extends StateMachineProcedure<MasterProcedureEnv, Integer> implements PeerProcedureInterface {

@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.REFRESH;
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, Integer state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
return Flow.NO_MORE_STATE;
}

@Override
protected Integer getState(int stateId) {
return Integer.valueOf(stateId);
}

@Override
protected int getStateId(Integer state) {
return state.intValue();
}

@Override
protected Integer getInitialState() {
return 0;
}

@Override
public String getPeerId() {
return "1";
}

@Override
protected void rollbackState(MasterProcedureEnv env, Integer state)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
}
}

@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
UTIL.startMiniCluster(3);
}

@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void testPeerProcedure() {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
CyclicBarrier barrier = new CyclicBarrier(2);
MyPeerProcedure p1 = new MyPeerProcedure(barrier);
MyPeerProcedure p2 = new MyPeerProcedure(barrier);
long id1 = procExec.submitProcedure(p1);
long id2 = procExec.submitProcedure(p2);
UTIL.waitFor(10000, () -> procExec.isFinished(id1));
UTIL.waitFor(10000, () -> procExec.isFinished(id2));
}
}