17
17
*/
18
18
package org .apache .hadoop .hbase .master .replication ;
19
19
20
+ import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER ;
20
21
import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER ;
22
+ import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER ;
21
23
import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER ;
22
24
import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE ;
23
25
import static org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProcedureProtos .MigrateReplicationQueueFromZkToTableState .MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE ;
@@ -111,6 +113,26 @@ private void shutdownExecutorService() {
111
113
}
112
114
}
113
115
116
+ private void disableReplicationLogCleaner (MasterProcedureEnv env )
117
+ throws ProcedureSuspendedException {
118
+ if (!env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().disable ()) {
119
+ // it is not likely that we can reach here as we will schedule this procedure immediately
120
+ // after master restarting, where ReplicationLogCleaner should have not started its first run
121
+ // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
122
+ // there will be no data in the new replication queue storage before we execute this procedure
123
+ // so ReplicationLogCleaner will quit immediately without doing anything.
124
+ throw suspend (env .getMasterConfiguration (),
125
+ backoff -> LOG .info (
126
+ "Can not disable replication log cleaner, sleep {} secs and retry later" ,
127
+ backoff / 1000 ));
128
+ }
129
+ resetRetry ();
130
+ }
131
+
132
+ private void enableReplicationLogCleaner (MasterProcedureEnv env ) {
133
+ env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().enable ();
134
+ }
135
+
114
136
private void waitUntilNoPeerProcedure (MasterProcedureEnv env ) throws ProcedureSuspendedException {
115
137
long peerProcCount ;
116
138
try {
@@ -136,6 +158,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
136
158
MigrateReplicationQueueFromZkToTableState state )
137
159
throws ProcedureSuspendedException , ProcedureYieldException , InterruptedException {
138
160
switch (state ) {
161
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER :
162
+ disableReplicationLogCleaner (env );
163
+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE );
164
+ return Flow .HAS_MORE_STATE ;
139
165
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE :
140
166
waitUntilNoPeerProcedure (env );
141
167
List <ReplicationPeerDescription > peers = env .getReplicationPeerManager ().listPeers (null );
@@ -152,7 +178,8 @@ protected Flow executeFromState(MasterProcedureEnv env,
152
178
"failed to delete old replication queue data, sleep {} secs and retry later" ,
153
179
backoff / 1000 , e ));
154
180
}
155
- return Flow .NO_MORE_STATE ;
181
+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER );
182
+ return Flow .HAS_MORE_STATE ;
156
183
}
157
184
// here we do not care the peers which have already been disabled, as later we do not need
158
185
// to enable them
@@ -232,6 +259,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
232
259
for (String peerId : disabledPeerIds ) {
233
260
addChildProcedure (new EnablePeerProcedure (peerId ));
234
261
}
262
+ setNextState (MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER );
263
+ return Flow .HAS_MORE_STATE ;
264
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER :
265
+ enableReplicationLogCleaner (env );
235
266
return Flow .NO_MORE_STATE ;
236
267
default :
237
268
throw new UnsupportedOperationException ("unhandled state=" + state );
@@ -263,7 +294,19 @@ protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
263
294
264
295
@ Override
265
296
protected MigrateReplicationQueueFromZkToTableState getInitialState () {
266
- return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE ;
297
+ return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER ;
298
+ }
299
+
300
+ @ Override
301
+ protected void afterReplay (MasterProcedureEnv env ) {
302
+ if (getCurrentState () == getInitialState ()) {
303
+ // do not need to disable log cleaner or acquire lock if we are in the initial state, later
304
+ // when executing the procedure we will try to disable and acquire.
305
+ return ;
306
+ }
307
+ if (!env .getReplicationPeerManager ().getReplicationLogCleanerBarrier ().disable ()) {
308
+ throw new IllegalStateException ("can not disable log cleaner, this should not happen" );
309
+ }
267
310
}
268
311
269
312
@ Override
0 commit comments