22
22
import java .util .List ;
23
23
import java .util .function .Function ;
24
24
import java .util .function .Supplier ;
25
+ import org .apache .hadoop .hbase .HConstants ;
25
26
import org .apache .hadoop .hbase .ServerName ;
26
27
import org .apache .hadoop .hbase .TableExistsException ;
27
28
import org .apache .hadoop .hbase .TableName ;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
95
96
(n , k ) -> n .compareKey ((String ) k );
96
97
private final static AvlKeyComparator <MetaQueue > META_QUEUE_KEY_COMPARATOR =
97
98
(n , k ) -> n .compareKey ((TableName ) k );
99
+ private final static AvlKeyComparator <GlobalQueue > GLOBAL_QUEUE_KEY_COMPARATOR =
100
+ (n , k ) -> n .compareKey ((String ) k );
98
101
99
102
private final FairQueue <ServerName > serverRunQueue = new FairQueue <>();
100
103
private final FairQueue <TableName > tableRunQueue = new FairQueue <>();
101
104
private final FairQueue <String > peerRunQueue = new FairQueue <>();
102
105
private final FairQueue <TableName > metaRunQueue = new FairQueue <>();
106
+ private final FairQueue <String > globalRunQueue = new FairQueue <>();
103
107
104
108
private final ServerQueue [] serverBuckets = new ServerQueue [128 ];
105
109
private TableQueue tableMap = null ;
106
110
private PeerQueue peerMap = null ;
107
111
private MetaQueue metaMap = null ;
112
+ private GlobalQueue globalMap = null ;
108
113
109
114
private final SchemaLocking locking ;
110
115
@@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
128
133
doAdd (serverRunQueue , getServerQueue (spi .getServerName (), spi ), proc , addFront );
129
134
} else if (isPeerProcedure (proc )) {
130
135
doAdd (peerRunQueue , getPeerQueue (getPeerId (proc )), proc , addFront );
136
+ } else if (isGlobalProcedure (proc )) {
137
+ doAdd (globalRunQueue , getGlobalQueue (getGlobalId (proc )), proc , addFront );
131
138
} else {
132
139
// TODO: at the moment we only have Table and Server procedures
133
140
// if you are implementing a non-table/non-server procedure, you have two options: create
@@ -163,14 +170,19 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
163
170
164
171
@ Override
165
172
protected boolean queueHasRunnables () {
166
- return metaRunQueue .hasRunnables () || tableRunQueue .hasRunnables ()
167
- || serverRunQueue .hasRunnables () || peerRunQueue .hasRunnables ();
173
+ return globalRunQueue .hasRunnables () || metaRunQueue .hasRunnables ()
174
+ || tableRunQueue .hasRunnables () || serverRunQueue .hasRunnables ()
175
+ || peerRunQueue .hasRunnables ();
168
176
}
169
177
170
178
@ Override
171
179
protected Procedure dequeue () {
172
- // meta procedure is always the first priority
173
- Procedure <?> pollResult = doPoll (metaRunQueue );
180
+ // pull global first
181
+ Procedure <?> pollResult = doPoll (globalRunQueue );
182
+ // then meta procedure
183
+ if (pollResult == null ) {
184
+ pollResult = doPoll (metaRunQueue );
185
+ }
174
186
// For now, let server handling have precedence over table handling; presumption is that it
175
187
// is more important handling crashed servers than it is running the
176
188
// enabling/disabling tables, etc.
@@ -268,6 +280,14 @@ private void clearQueue() {
268
280
clear (peerMap , peerRunQueue , PEER_QUEUE_KEY_COMPARATOR );
269
281
peerMap = null ;
270
282
283
+ // Remove Meta
284
+ clear (metaMap , metaRunQueue , META_QUEUE_KEY_COMPARATOR );
285
+ metaMap = null ;
286
+
287
+ // Remove Global
288
+ clear (globalMap , globalRunQueue , GLOBAL_QUEUE_KEY_COMPARATOR );
289
+ globalMap = null ;
290
+
271
291
assert size () == 0 : "expected queue size to be 0, got " + size ();
272
292
}
273
293
@@ -300,6 +320,7 @@ protected int queueSize() {
300
320
count += queueSize (tableMap );
301
321
count += queueSize (peerMap );
302
322
count += queueSize (metaMap );
323
+ count += queueSize (globalMap );
303
324
return count ;
304
325
}
305
326
@@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
502
523
return proc instanceof MetaProcedureInterface ;
503
524
}
504
525
526
+ // ============================================================================
527
+ // Global Queue Lookup Helpers
528
+ // ============================================================================
529
+ private GlobalQueue getGlobalQueue (String globalId ) {
530
+ GlobalQueue node = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
531
+ if (node != null ) {
532
+ return node ;
533
+ }
534
+ node = new GlobalQueue (globalId , locking .getGlobalLock (globalId ));
535
+ globalMap = AvlTree .insert (globalMap , node );
536
+ return node ;
537
+ }
538
+
539
+ private void removeGlobalQueue (String globalId ) {
540
+ globalMap = AvlTree .remove (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
541
+ locking .removeGlobalLock (globalId );
542
+ }
543
+
544
+ private void tryCleanupGlobalQueue (String globalId , Procedure <?> procedure ) {
545
+ schedLock ();
546
+ try {
547
+ GlobalQueue queue = AvlTree .get (globalMap , globalId , GLOBAL_QUEUE_KEY_COMPARATOR );
548
+ if (queue == null ) {
549
+ return ;
550
+ }
551
+
552
+ final LockAndQueue lock = locking .getGlobalLock (globalId );
553
+ if (queue .isEmpty () && lock .tryExclusiveLock (procedure )) {
554
+ removeFromRunQueue (globalRunQueue , queue ,
555
+ () -> "clean up global queue after " + procedure + " completed" );
556
+ removeGlobalQueue (globalId );
557
+ }
558
+ } finally {
559
+ schedUnlock ();
560
+ }
561
+ }
562
+
563
+ private static boolean isGlobalProcedure (Procedure <?> proc ) {
564
+ return proc instanceof GlobalProcedureInterface ;
565
+ }
566
+
567
+ private static String getGlobalId (Procedure <?> proc ) {
568
+ return ((GlobalProcedureInterface ) proc ).getGlobalId ();
569
+ }
570
+
505
571
// ============================================================================
506
572
// Table Locking Helpers
507
573
// ============================================================================
@@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1006
1072
}
1007
1073
}
1008
1074
1075
+ // ============================================================================
1076
+ // Global Locking Helpers
1077
+ // ============================================================================
1078
+ /**
1079
+ * Try to acquire the share lock on global.
1080
+ * @see #wakeGlobalExclusiveLock(Procedure, String)
1081
+ * @param procedure the procedure trying to acquire the lock
1082
+ * @return true if the procedure has to wait for global to be available
1083
+ */
1084
+ public boolean waitGlobalExclusiveLock (Procedure <?> procedure , String globalId ) {
1085
+ schedLock ();
1086
+ try {
1087
+ final LockAndQueue lock = locking .getGlobalLock (globalId );
1088
+ if (lock .tryExclusiveLock (procedure )) {
1089
+ removeFromRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1090
+ () -> procedure + " held shared lock" );
1091
+ return false ;
1092
+ }
1093
+ waitProcedure (lock , procedure );
1094
+ logLockedResource (LockedResourceType .GLOBAL , HConstants .EMPTY_STRING );
1095
+ return true ;
1096
+ } finally {
1097
+ schedUnlock ();
1098
+ }
1099
+ }
1100
+
1101
+ /**
1102
+ * Wake the procedures waiting for global.
1103
+ * @see #waitGlobalExclusiveLock(Procedure, String)
1104
+ * @param procedure the procedure releasing the lock
1105
+ */
1106
+ public void wakeGlobalExclusiveLock (Procedure <?> procedure , String globalId ) {
1107
+ schedLock ();
1108
+ try {
1109
+ final LockAndQueue lock = locking .getGlobalLock (globalId );
1110
+ lock .releaseExclusiveLock (procedure );
1111
+ addToRunQueue (globalRunQueue , getGlobalQueue (globalId ),
1112
+ () -> procedure + " released shared lock" );
1113
+ int waitingCount = wakeWaitingProcedures (lock );
1114
+ wakePollIfNeeded (waitingCount );
1115
+ } finally {
1116
+ schedUnlock ();
1117
+ }
1118
+ }
1119
+
1009
1120
/**
1010
1121
* For debugging. Expensive.
1011
1122
*/
0 commit comments