19
19
20
20
import static org .apache .hadoop .util .Time .monotonicNow ;
21
21
22
+ import com .google .common .collect .Lists ;
22
23
import java .io .IOException ;
23
24
import java .net .URI ;
24
25
import java .net .URL ;
25
26
import java .security .PrivilegedAction ;
26
27
import java .util .ArrayList ;
28
+ import java .util .HashMap ;
27
29
import java .util .List ;
30
+ import java .util .Map ;
28
31
import java .util .concurrent .*;
29
- import java .util .concurrent .atomic .AtomicInteger ;
30
32
31
33
import org .apache .hadoop .classification .InterfaceAudience ;
32
34
import org .apache .hadoop .conf .Configuration ;
@@ -67,19 +69,20 @@ public class StandbyCheckpointer {
67
69
private final Configuration conf ;
68
70
private final FSNamesystem namesystem ;
69
71
private long lastCheckpointTime ;
70
- private long lastUploadTime ;
71
72
private final CheckpointerThread thread ;
72
73
private final ThreadFactory uploadThreadFactory ;
73
74
private List <URL > activeNNAddresses ;
74
75
private URL myNNAddress ;
75
76
76
77
private final Object cancelLock = new Object ();
77
78
private Canceler canceler ;
78
- private boolean isPrimaryCheckPointer = true ;
79
79
80
80
// Keep track of how many checkpoints were canceled.
81
81
// This is for use in tests.
82
82
private static int canceledCount = 0 ;
83
+
84
+ // A map from NN url to the most recent image upload time.
85
+ private final HashMap <String , CheckpointReceiverEntry > checkpointReceivers ;
83
86
84
87
public StandbyCheckpointer (Configuration conf , FSNamesystem ns )
85
88
throws IOException {
@@ -89,8 +92,38 @@ public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
89
92
this .thread = new CheckpointerThread ();
90
93
this .uploadThreadFactory = new ThreadFactoryBuilder ().setDaemon (true )
91
94
.setNameFormat ("TransferFsImageUpload-%d" ).build ();
92
-
93
95
setNameNodeAddresses (conf );
96
+ this .checkpointReceivers = new HashMap <>();
97
+ for (URL address : activeNNAddresses ) {
98
+ this .checkpointReceivers .put (address .toString (),
99
+ new CheckpointReceiverEntry ());
100
+ }
101
+ }
102
+
103
+ private static final class CheckpointReceiverEntry {
104
+ private long lastUploadTime ;
105
+ private boolean isPrimary ;
106
+
107
+ CheckpointReceiverEntry () {
108
+ this .lastUploadTime = 0L ;
109
+ this .isPrimary = true ;
110
+ }
111
+
112
+ void setLastUploadTime (long lastUploadTime ) {
113
+ this .lastUploadTime = lastUploadTime ;
114
+ }
115
+
116
+ void setIsPrimary (boolean isPrimaryFor ) {
117
+ this .isPrimary = isPrimaryFor ;
118
+ }
119
+
120
+ long getLastUploadTime () {
121
+ return lastUploadTime ;
122
+ }
123
+
124
+ boolean isPrimary () {
125
+ return isPrimary ;
126
+ }
94
127
}
95
128
96
129
/**
@@ -158,7 +191,7 @@ public void triggerRollbackCheckpoint() {
158
191
thread .interrupt ();
159
192
}
160
193
161
- private void doCheckpoint (boolean sendCheckpoint ) throws InterruptedException , IOException {
194
+ private void doCheckpoint () throws InterruptedException , IOException {
162
195
assert canceler != null ;
163
196
final long txid ;
164
197
final NameNodeFile imageType ;
@@ -210,11 +243,6 @@ private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, I
210
243
namesystem .cpUnlock ();
211
244
}
212
245
213
- //early exit if we shouldn't actually send the checkpoint to the ANN
214
- if (!sendCheckpoint ){
215
- return ;
216
- }
217
-
218
246
// Upload the saved checkpoint back to the active
219
247
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
220
248
// than the expected number of tasks to run or queue up
@@ -224,56 +252,70 @@ private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, I
224
252
uploadThreadFactory );
225
253
// for right now, just match the upload to the nn address by convention. There is no need to
226
254
// directly tie them together by adding a pair class.
227
- List < Future <TransferFsImage .TransferResult >> uploads =
228
- new ArrayList < Future < TransferFsImage . TransferResult > >();
255
+ HashMap < String , Future <TransferFsImage .TransferResult >> uploads =
256
+ new HashMap < >();
229
257
for (final URL activeNNAddress : activeNNAddresses ) {
230
- Future <TransferFsImage .TransferResult > upload =
231
- executor .submit (new Callable <TransferFsImage .TransferResult >() {
232
- @ Override
233
- public TransferFsImage .TransferResult call ()
234
- throws IOException , InterruptedException {
235
- CheckpointFaultInjector .getInstance ().duringUploadInProgess ();
236
- return TransferFsImage .uploadImageFromStorage (activeNNAddress , conf , namesystem
237
- .getFSImage ().getStorage (), imageType , txid , canceler );
238
- }
239
- });
240
- uploads .add (upload );
258
+ // Upload image if at least 1 of 2 following conditions met:
259
+ // 1. has been quiet for long enough, try to contact the node.
260
+ // 2. this standby IS the primary checkpointer of target NN.
261
+ String addressString = activeNNAddress .toString ();
262
+ assert checkpointReceivers .containsKey (addressString );
263
+ CheckpointReceiverEntry receiverEntry =
264
+ checkpointReceivers .get (addressString );
265
+ long secsSinceLastUpload =
266
+ TimeUnit .MILLISECONDS .toSeconds (
267
+ monotonicNow () - receiverEntry .getLastUploadTime ());
268
+ boolean shouldUpload = receiverEntry .isPrimary () ||
269
+ secsSinceLastUpload >= checkpointConf .getQuietPeriod ();
270
+ if (shouldUpload ) {
271
+ Future <TransferFsImage .TransferResult > upload =
272
+ executor .submit (new Callable <TransferFsImage .TransferResult >() {
273
+ @ Override
274
+ public TransferFsImage .TransferResult call ()
275
+ throws IOException , InterruptedException {
276
+ CheckpointFaultInjector .getInstance ().duringUploadInProgess ();
277
+ return TransferFsImage .uploadImageFromStorage (activeNNAddress ,
278
+ conf , namesystem .getFSImage ().getStorage (), imageType , txid ,
279
+ canceler );
280
+ }
281
+ });
282
+ uploads .put (addressString , upload );
283
+ }
241
284
}
242
285
InterruptedException ie = null ;
243
- IOException ioe = null ;
244
- int i = 0 ;
245
- boolean success = false ;
246
- for (; i < uploads . size (); i ++) {
247
- Future <TransferFsImage .TransferResult > upload = uploads . get ( i );
286
+ List < IOException > ioes = Lists . newArrayList () ;
287
+ for ( Map . Entry < String , Future < TransferFsImage . TransferResult >> entry :
288
+ uploads . entrySet ()) {
289
+ String url = entry . getKey ();
290
+ Future <TransferFsImage .TransferResult > upload = entry . getValue ( );
248
291
try {
249
- // TODO should there be some smarts here about retries nodes that are not the active NN?
292
+ // TODO should there be some smarts here about retries nodes that
293
+ // are not the active NN?
294
+ CheckpointReceiverEntry receiverEntry = checkpointReceivers .get (url );
250
295
if (upload .get () == TransferFsImage .TransferResult .SUCCESS ) {
251
- success = true ;
252
- //avoid getting the rest of the results - we don't care since we had a successful upload
253
- break ;
296
+ receiverEntry .setLastUploadTime (monotonicNow ());
297
+ receiverEntry .setIsPrimary (true );
298
+ } else {
299
+ receiverEntry .setIsPrimary (false );
254
300
}
255
-
256
301
} catch (ExecutionException e ) {
257
- ioe = new IOException ("Exception during image upload" , e );
258
- break ;
302
+ // Even if exception happens, still proceeds to next NN url.
303
+ // so that fail to upload to previous NN does not cause the
304
+ // remaining NN not getting the fsImage.
305
+ ioes .add (new IOException ("Exception during image upload" , e ));
259
306
} catch (InterruptedException e ) {
260
307
ie = e ;
261
308
break ;
262
309
}
263
310
}
264
- if (ie == null && ioe == null ) {
265
- //Update only when response from remote about success or
266
- lastUploadTime = monotonicNow ();
267
- // we are primary if we successfully updated the ANN
268
- this .isPrimaryCheckPointer = success ;
269
- }
270
311
// cleaner than copying code for multiple catch statements and better than catching all
271
312
// exceptions, so we just handle the ones we expect.
272
- if (ie != null || ioe != null ) {
313
+ if (ie != null ) {
273
314
274
315
// cancel the rest of the tasks, and close the pool
275
- for (; i < uploads .size (); i ++) {
276
- Future <TransferFsImage .TransferResult > upload = uploads .get (i );
316
+ for (Map .Entry <String , Future <TransferFsImage .TransferResult >> entry :
317
+ uploads .entrySet ()) {
318
+ Future <TransferFsImage .TransferResult > upload = entry .getValue ();
277
319
// The background thread may be blocked waiting in the throttler, so
278
320
// interrupt it.
279
321
upload .cancel (true );
@@ -286,11 +328,11 @@ public TransferFsImage.TransferResult call()
286
328
executor .awaitTermination (500 , TimeUnit .MILLISECONDS );
287
329
288
330
// re-throw the exception we got, since one of these two must be non-null
289
- if ( ie != null ) {
290
- throw ie ;
291
- } else if ( ioe != null ) {
292
- throw ioe ;
293
- }
331
+ throw ie ;
332
+ }
333
+
334
+ if (! ioes . isEmpty ()) {
335
+ throw MultipleIOException . createIOException ( ioes );
294
336
}
295
337
}
296
338
@@ -373,7 +415,6 @@ private void doWork() {
373
415
// Reset checkpoint time so that we don't always checkpoint
374
416
// on startup.
375
417
lastCheckpointTime = monotonicNow ();
376
- lastUploadTime = monotonicNow ();
377
418
while (shouldRun ) {
378
419
boolean needRollbackCheckpoint = namesystem .isNeedRollbackFsImage ();
379
420
if (!needRollbackCheckpoint ) {
@@ -426,10 +467,7 @@ private void doWork() {
426
467
427
468
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
428
469
// rollback request, are the checkpointer, are outside the quiet period.
429
- final long secsSinceLastUpload = (now - lastUploadTime ) / 1000 ;
430
- boolean sendRequest = isPrimaryCheckPointer
431
- || secsSinceLastUpload >= checkpointConf .getQuietPeriod ();
432
- doCheckpoint (sendRequest );
470
+ doCheckpoint ();
433
471
434
472
// reset needRollbackCheckpoint to false only when we finish a ckpt
435
473
// for rollback image
0 commit comments