Skip to content

Commit 93b20a6

Browse files
committed
HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
1 parent 45589dd commit 93b20a6

File tree

6 files changed

+246
-6
lines changed

6 files changed

+246
-6
lines changed

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ public enum LockedResourceType {
2626
TABLE,
2727
REGION,
2828
PEER,
29-
META
29+
META,
30+
GLOBAL
3031
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* Procedure interface for global operations, such as migration.
24+
*/
25+
@InterfaceAudience.Private
26+
public interface GlobalProcedureInterface {
27+
28+
String getGlobalId();
29+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import org.apache.hadoop.hbase.procedure2.LockStatus;
21+
import org.apache.hadoop.hbase.procedure2.Procedure;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
@InterfaceAudience.Private
25+
public class GlobalQueue extends Queue<String> {
26+
27+
public GlobalQueue(String globalId, LockStatus lockStatus) {
28+
super(globalId, lockStatus);
29+
}
30+
31+
@Override
32+
boolean requireExclusiveLock(Procedure<?> proc) {
33+
return true;
34+
}
35+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.function.Function;
2424
import java.util.function.Supplier;
25+
import org.apache.hadoop.hbase.HConstants;
2526
import org.apache.hadoop.hbase.ServerName;
2627
import org.apache.hadoop.hbase.TableExistsException;
2728
import org.apache.hadoop.hbase.TableName;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
9596
(n, k) -> n.compareKey((String) k);
9697
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
9798
(n, k) -> n.compareKey((TableName) k);
99+
private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
100+
(n, k) -> n.compareKey((String) k);
98101

99102
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
100103
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
101104
private final FairQueue<String> peerRunQueue = new FairQueue<>();
102105
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
106+
private final FairQueue<String> globalRunQueue = new FairQueue<>();
103107

104108
private final ServerQueue[] serverBuckets = new ServerQueue[128];
105109
private TableQueue tableMap = null;
106110
private PeerQueue peerMap = null;
107111
private MetaQueue metaMap = null;
112+
private GlobalQueue globalMap = null;
108113

109114
private final SchemaLocking locking;
110115

@@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
128133
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
129134
} else if (isPeerProcedure(proc)) {
130135
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
136+
} else if (isGlobalProcedure(proc)) {
137+
doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
131138
} else {
132139
// TODO: at the moment we only have Table and Server procedures
133140
// if you are implementing a non-table/non-server procedure, you have two options: create
@@ -163,14 +170,19 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
163170

164171
@Override
165172
protected boolean queueHasRunnables() {
166-
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
167-
|| serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
173+
return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
174+
|| tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
175+
|| peerRunQueue.hasRunnables();
168176
}
169177

170178
@Override
171179
protected Procedure dequeue() {
172-
// meta procedure is always the first priority
173-
Procedure<?> pollResult = doPoll(metaRunQueue);
180+
// pull global first
181+
Procedure<?> pollResult = doPoll(globalRunQueue);
182+
// then meta procedure
183+
if (pollResult == null) {
184+
pollResult = doPoll(metaRunQueue);
185+
}
174186
// For now, let server handling have precedence over table handling; presumption is that it
175187
// is more important handling crashed servers than it is running the
176188
// enabling/disabling tables, etc.
@@ -268,6 +280,14 @@ private void clearQueue() {
268280
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
269281
peerMap = null;
270282

283+
// Remove Meta
284+
clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
285+
metaMap = null;
286+
287+
// Remove Global
288+
clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
289+
globalMap = null;
290+
271291
assert size() == 0 : "expected queue size to be 0, got " + size();
272292
}
273293

@@ -300,6 +320,7 @@ protected int queueSize() {
300320
count += queueSize(tableMap);
301321
count += queueSize(peerMap);
302322
count += queueSize(metaMap);
323+
count += queueSize(globalMap);
303324
return count;
304325
}
305326

@@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
502523
return proc instanceof MetaProcedureInterface;
503524
}
504525

526+
// ============================================================================
527+
// Global Queue Lookup Helpers
528+
// ============================================================================
529+
private GlobalQueue getGlobalQueue(String globalId) {
530+
GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
531+
if (node != null) {
532+
return node;
533+
}
534+
node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
535+
globalMap = AvlTree.insert(globalMap, node);
536+
return node;
537+
}
538+
539+
private void removeGlobalQueue(String globalId) {
540+
globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
541+
locking.removeGlobalLock(globalId);
542+
}
543+
544+
private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
545+
schedLock();
546+
try {
547+
GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
548+
if (queue == null) {
549+
return;
550+
}
551+
552+
final LockAndQueue lock = locking.getGlobalLock(globalId);
553+
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
554+
removeFromRunQueue(globalRunQueue, queue,
555+
() -> "clean up global queue after " + procedure + " completed");
556+
removeGlobalQueue(globalId);
557+
}
558+
} finally {
559+
schedUnlock();
560+
}
561+
}
562+
563+
private static boolean isGlobalProcedure(Procedure<?> proc) {
564+
return proc instanceof GlobalProcedureInterface;
565+
}
566+
567+
private static String getGlobalId(Procedure<?> proc) {
568+
return ((GlobalProcedureInterface) proc).getGlobalId();
569+
}
570+
505571
// ============================================================================
506572
// Table Locking Helpers
507573
// ============================================================================
@@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure<?> procedure) {
10061072
}
10071073
}
10081074

1075+
// ============================================================================
1076+
// Global Locking Helpers
1077+
// ============================================================================
1078+
/**
1079+
* Try to acquire the share lock on global.
1080+
* @see #wakeGlobalExclusiveLock(Procedure, String)
1081+
* @param procedure the procedure trying to acquire the lock
1082+
* @return true if the procedure has to wait for global to be available
1083+
*/
1084+
public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1085+
schedLock();
1086+
try {
1087+
final LockAndQueue lock = locking.getGlobalLock(globalId);
1088+
if (lock.tryExclusiveLock(procedure)) {
1089+
removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
1090+
() -> procedure + " held shared lock");
1091+
return false;
1092+
}
1093+
waitProcedure(lock, procedure);
1094+
logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
1095+
return true;
1096+
} finally {
1097+
schedUnlock();
1098+
}
1099+
}
1100+
1101+
/**
1102+
* Wake the procedures waiting for global.
1103+
* @see #waitGlobalExclusiveLock(Procedure, String)
1104+
* @param procedure the procedure releasing the lock
1105+
*/
1106+
public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1107+
schedLock();
1108+
try {
1109+
final LockAndQueue lock = locking.getGlobalLock(globalId);
1110+
lock.releaseExclusiveLock(procedure);
1111+
addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
1112+
() -> procedure + " released shared lock");
1113+
int waitingCount = wakeWaitingProcedures(lock);
1114+
wakePollIfNeeded(waitingCount);
1115+
} finally {
1116+
schedUnlock();
1117+
}
1118+
}
1119+
10091120
/**
10101121
* For debugging. Expensive.
10111122
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SchemaLocking {
5353
// Single map for all regions irrespective of tables. Key is encoded region name.
5454
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
5555
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
56+
private final Map<String, LockAndQueue> globalLocks = new HashMap<>();
5657
private final LockAndQueue metaLock;
5758

5859
public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
@@ -94,6 +95,10 @@ LockAndQueue getMetaLock() {
9495
return metaLock;
9596
}
9697

98+
LockAndQueue getGlobalLock(String globalId) {
99+
return getLock(globalLocks, globalId);
100+
}
101+
97102
LockAndQueue removeRegionLock(String encodedRegionName) {
98103
return regionLocks.remove(encodedRegionName);
99104
}
@@ -114,6 +119,10 @@ LockAndQueue removePeerLock(String peerId) {
114119
return peerLocks.remove(peerId);
115120
}
116121

122+
LockAndQueue removeGlobalLock(String globalId) {
123+
return globalLocks.remove(globalId);
124+
}
125+
117126
private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
118127
LockAndQueue queue) {
119128
LockType lockType;
@@ -164,6 +173,8 @@ List<LockedResource> getLocks() {
164173
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
165174
addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
166175
tn -> tn.getNameAsString(), LockedResourceType.META);
176+
addToLockedResources(lockedResources, globalLocks, Function.identity(),
177+
LockedResourceType.GLOBAL);
167178
return lockedResources;
168179
}
169180

@@ -191,6 +202,10 @@ LockedResource getLockResource(LockedResourceType resourceType, String resourceN
191202
break;
192203
case META:
193204
queue = metaLock;
205+
break;
206+
case GLOBAL:
207+
queue = globalLocks.get(resourceName);
208+
break;
194209
default:
195210
queue = null;
196211
break;
@@ -216,7 +231,8 @@ public String toString() {
216231
+ filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks)
217232
+ ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks="
218233
+ filterUnlocked(this.peerLocks) + ", metaLocks="
219-
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
234+
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks="
235+
+ filterUnlocked(globalLocks);
220236
}
221237

222238
private String filterUnlocked(Map<?, LockAndQueue> locks) {

hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,21 @@ public PeerOperationType getPeerOperationType() {
940940
}
941941
}
942942

943+
public static class TestGlobalProcedure extends TestProcedure
944+
implements GlobalProcedureInterface {
945+
private final String globalId;
946+
947+
public TestGlobalProcedure(long procId, String globalId) {
948+
super(procId);
949+
this.globalId = globalId;
950+
}
951+
952+
@Override
953+
public String getGlobalId() {
954+
return globalId;
955+
}
956+
}
957+
943958
private static LockProcedure createLockProcedure(LockType lockType, long procId)
944959
throws Exception {
945960
LockProcedure procedure = new LockProcedure();
@@ -1093,6 +1108,39 @@ public void testListLocksPeer() throws Exception {
10931108
assertEquals(1, resource.getWaitingProcedures().size());
10941109
}
10951110

1111+
@Test
1112+
public void testListLocksGlobal() throws Exception {
1113+
String globalId = "1";
1114+
LockProcedure procedure = createExclusiveLockProcedure(4);
1115+
queue.waitGlobalExclusiveLock(procedure, globalId);
1116+
1117+
List<LockedResource> locks = queue.getLocks();
1118+
assertEquals(1, locks.size());
1119+
1120+
LockedResource resource = locks.get(0);
1121+
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
1122+
assertExclusiveLock(resource, procedure);
1123+
assertTrue(resource.getWaitingProcedures().isEmpty());
1124+
1125+
// Try to acquire the exclusive lock again with same procedure
1126+
assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId));
1127+
1128+
// Try to acquire the exclusive lock again with new procedure
1129+
LockProcedure procedure2 = createExclusiveLockProcedure(5);
1130+
assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId));
1131+
1132+
// Same peerId, still only has 1 LockedResource
1133+
locks = queue.getLocks();
1134+
assertEquals(1, locks.size());
1135+
1136+
resource = locks.get(0);
1137+
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
1138+
// LockedResource owner still is the origin procedure
1139+
assertExclusiveLock(resource, procedure);
1140+
// The new procedure should in the waiting list
1141+
assertEquals(1, resource.getWaitingProcedures().size());
1142+
}
1143+
10961144
@Test
10971145
public void testListLocksWaiting() throws Exception {
10981146
LockProcedure procedure1 = createExclusiveLockProcedure(1);

0 commit comments

Comments
 (0)