17
17
*/
18
18
package org .apache .hadoop .hdfs .server .federation .router ;
19
19
20
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEALTH_MONITOR_TIMEOUT ;
21
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT ;
20
22
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEARTBEAT_INTERVAL_MS ;
21
23
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT ;
22
24
25
27
import java .net .InetSocketAddress ;
26
28
import java .net .URI ;
27
29
import java .util .Map ;
30
+ import java .util .concurrent .TimeUnit ;
28
31
29
32
import org .apache .hadoop .conf .Configuration ;
30
33
import org .apache .hadoop .ha .HAServiceProtocol ;
@@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
85
88
private NNHAServiceTarget localTarget ;
86
89
/** Cache HA protocol. */
87
90
private HAServiceProtocol localTargetHAProtocol ;
91
+ /** Cache NN protocol. */
92
+ private NamenodeProtocol namenodeProtocol ;
93
+ /** Cache Client protocol. */
94
+ private ClientProtocol clientProtocol ;
88
95
/** RPC address for the namenode. */
89
96
private String rpcAddress ;
90
97
/** Service RPC address for the namenode. */
@@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
100
107
101
108
private String resolvedHost ;
102
109
private String originalNnId ;
110
+
111
+ private int healthMonitorTimeoutMs = (int ) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT ;
112
+
103
113
/**
104
114
* Create a new Namenode status updater.
105
115
* @param resolver Namenode resolver service to handle NN registration.
@@ -211,6 +221,15 @@ protected void serviceInit(Configuration configuration) throws Exception {
211
221
DFS_ROUTER_HEARTBEAT_INTERVAL_MS ,
212
222
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT ));
213
223
224
+ long timeoutMs = conf .getTimeDuration (DFS_ROUTER_HEALTH_MONITOR_TIMEOUT ,
225
+ DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT , TimeUnit .MILLISECONDS );
226
+ if (timeoutMs < 0 ) {
227
+ LOG .warn ("Invalid value {} configured for {} should be greater than or equal to 0. " +
228
+ "Using value of : 0ms instead." , timeoutMs , DFS_ROUTER_HEALTH_MONITOR_TIMEOUT );
229
+ this .healthMonitorTimeoutMs = 0 ;
230
+ } else {
231
+ this .healthMonitorTimeoutMs = (int ) timeoutMs ;
232
+ }
214
233
215
234
super .serviceInit (configuration );
216
235
}
@@ -309,66 +328,26 @@ protected NamenodeStatusReport getNamenodeStatusReport() {
309
328
LOG .debug ("Probing NN at service address: {}" , serviceAddress );
310
329
311
330
URI serviceURI = new URI ("hdfs://" + serviceAddress );
312
- // Read the filesystem info from RPC (required)
313
- NamenodeProtocol nn = NameNodeProxies
314
- .createProxy (this .conf , serviceURI , NamenodeProtocol .class )
315
- .getProxy ();
316
331
317
- if (nn != null ) {
318
- NamespaceInfo info = nn .versionRequest ();
319
- if (info != null ) {
320
- report .setNamespaceInfo (info );
321
- }
322
- }
332
+ // Read the filesystem info from RPC (required)
333
+ updateNameSpaceInfoParameters (serviceURI , report );
323
334
if (!report .registrationValid ()) {
324
335
return report ;
325
336
}
326
337
327
338
// Check for safemode from the client protocol. Currently optional, but
328
339
// should be required at some point for QoS
329
- try {
330
- ClientProtocol client = NameNodeProxies
331
- .createProxy (this .conf , serviceURI , ClientProtocol .class )
332
- .getProxy ();
333
- if (client != null ) {
334
- boolean isSafeMode = client .setSafeMode (
335
- SafeModeAction .SAFEMODE_GET , false );
336
- report .setSafeMode (isSafeMode );
337
- }
338
- } catch (Exception e ) {
339
- LOG .error ("Cannot fetch safemode state for {}" , getNamenodeDesc (), e );
340
- }
340
+ updateSafeModeParameters (serviceURI , report );
341
341
342
342
// Read the stats from JMX (optional)
343
343
updateJMXParameters (webAddress , report );
344
344
345
- if (localTarget != null ) {
346
- // Try to get the HA status
347
- try {
348
- // Determine if NN is active
349
- // TODO: dynamic timeout
350
- if (localTargetHAProtocol == null ) {
351
- localTargetHAProtocol = localTarget .getHealthMonitorProxy (conf , 30 *1000 );
352
- LOG .debug ("Get HA status with address {}" , lifelineAddress );
353
- }
354
- HAServiceStatus status = localTargetHAProtocol .getServiceStatus ();
355
- report .setHAServiceState (status .getState ());
356
- } catch (Throwable e ) {
357
- if (e .getMessage ().startsWith ("HA for namenode is not enabled" )) {
358
- LOG .error ("HA for {} is not enabled" , getNamenodeDesc ());
359
- localTarget = null ;
360
- } else {
361
- // Failed to fetch HA status, ignoring failure
362
- LOG .error ("Cannot fetch HA status for {}: {}" ,
363
- getNamenodeDesc (), e .getMessage (), e );
364
- }
365
- localTargetHAProtocol = null ;
366
- }
367
- }
368
- } catch (IOException e ) {
345
+ // Try to get the HA status
346
+ updateHAStatusParameters (report );
347
+ } catch (IOException e ) {
369
348
LOG .error ("Cannot communicate with {}: {}" ,
370
349
getNamenodeDesc (), e .getMessage ());
371
- } catch (Throwable e ) {
350
+ } catch (Throwable e ) {
372
351
// Generic error that we don't know about
373
352
LOG .error ("Unexpected exception while communicating with {}: {}" ,
374
353
getNamenodeDesc (), e .getMessage (), e );
@@ -399,6 +378,59 @@ private static String getNnHeartBeatServiceName(String nsId, String nnId) {
399
378
(nnId == null ? "" : " " + nnId );
400
379
}
401
380
381
+ /**
382
+ * Get the namespace information for a Namenode via RPC and add them to the report.
383
+ * @param serviceURI Server address of the Namenode to monitor.
384
+ * @param report Namenode status report updating with namespace information data.
385
+ * @throws IOException This method will throw IOException up, because RBF need
386
+ * use Namespace Info to identify this NS. If there are some IOExceptions,
387
+ * RBF doesn't need to get other information from NameNode,
388
+ * so throw IOException up.
389
+ */
390
+ private void updateNameSpaceInfoParameters (URI serviceURI ,
391
+ NamenodeStatusReport report ) throws IOException {
392
+ try {
393
+ if (this .namenodeProtocol == null ) {
394
+ this .namenodeProtocol = NameNodeProxies .createProxy (this .conf , serviceURI ,
395
+ NamenodeProtocol .class ).getProxy ();
396
+ }
397
+ if (namenodeProtocol != null ) {
398
+ NamespaceInfo info = namenodeProtocol .versionRequest ();
399
+ if (info != null ) {
400
+ report .setNamespaceInfo (info );
401
+ }
402
+ }
403
+ } catch (IOException e ) {
404
+ this .namenodeProtocol = null ;
405
+ throw e ;
406
+ }
407
+ }
408
+
409
+ /**
410
+ * Get the safemode information for a Namenode via RPC and add them to the report.
411
+ * Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
412
+ * So If there are some IOExceptions, RBF can just ignore it and try to collect
413
+ * other information form namenode continue.
414
+ * @param serviceURI Server address of the Namenode to monitor.
415
+ * @param report Namenode status report updating with safemode information data.
416
+ */
417
+ private void updateSafeModeParameters (URI serviceURI , NamenodeStatusReport report ) {
418
+ try {
419
+ if (this .clientProtocol == null ) {
420
+ this .clientProtocol = NameNodeProxies
421
+ .createProxy (this .conf , serviceURI , ClientProtocol .class )
422
+ .getProxy ();
423
+ }
424
+ if (clientProtocol != null ) {
425
+ boolean isSafeMode = clientProtocol .setSafeMode (SafeModeAction .SAFEMODE_GET , false );
426
+ report .setSafeMode (isSafeMode );
427
+ }
428
+ } catch (Exception e ) {
429
+ LOG .error ("Cannot fetch safemode state for {}" , getNamenodeDesc (), e );
430
+ this .clientProtocol = null ;
431
+ }
432
+ }
433
+
402
434
/**
403
435
* Get the parameters for a Namenode from JMX and add them to the report.
404
436
* @param address Web interface of the Namenode to monitor.
@@ -415,6 +447,34 @@ private void updateJMXParameters(
415
447
}
416
448
}
417
449
450
+ /**
451
+ * Get the HA status for a Namenode via RPC and add them to the report.
452
+ * @param report Namenode status report updating with HA status information data.
453
+ */
454
+ private void updateHAStatusParameters (NamenodeStatusReport report ) {
455
+ if (localTarget != null ) {
456
+ try {
457
+ // Determine if NN is active
458
+ if (localTargetHAProtocol == null ) {
459
+ localTargetHAProtocol = localTarget .getHealthMonitorProxy (
460
+ conf , this .healthMonitorTimeoutMs );
461
+ LOG .debug ("Get HA status with address {}" , lifelineAddress );
462
+ }
463
+ HAServiceStatus status = localTargetHAProtocol .getServiceStatus ();
464
+ report .setHAServiceState (status .getState ());
465
+ } catch (Throwable e ) {
466
+ if (e .getMessage ().startsWith ("HA for namenode is not enabled" )) {
467
+ LOG .error ("HA for {} is not enabled" , getNamenodeDesc ());
468
+ localTarget = null ;
469
+ } else {
470
+ // Failed to fetch HA status, ignoring failure
471
+ LOG .error ("Cannot fetch HA status for {}" , getNamenodeDesc (), e );
472
+ }
473
+ localTargetHAProtocol = null ;
474
+ }
475
+ }
476
+ }
477
+
418
478
/**
419
479
* Fetches NamenodeInfo metrics from namenode.
420
480
* @param address Web interface of the Namenode to monitor.
0 commit comments