@@ -209,14 +209,17 @@ public void enqueueLog(Path log) {
209
209
} else {
210
210
queue .put (log );
211
211
}
212
- LOG .trace ("Added log file {} to queue of source {}." , logPrefix ,
212
+ if (LOG .isTraceEnabled ()) {
213
+ LOG .trace ("{} Added log file {} to queue of source {}." , logPeerId (), logPrefix ,
213
214
this .replicationQueueInfo .getQueueId ());
215
+ }
214
216
this .metrics .incrSizeOfLogQueue ();
215
217
// This will log a warning for each new log that gets created above the warn threshold
216
218
int queueSize = queue .size ();
217
219
if (queueSize > this .logQueueWarnThreshold ) {
218
- LOG .warn ("WAL group " + logPrefix + " queue size: " + queueSize
219
- + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold );
220
+ LOG .warn ("{} WAL group {} queue size: {} exceeds value of "
221
+ + "replication.source.log.queue.warn: {}" , logPeerId (),
222
+ logPrefix , queueSize , logQueueWarnThreshold );
220
223
}
221
224
}
222
225
@@ -232,8 +235,8 @@ public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Pat
232
235
this .queueStorage .addHFileRefs (peerId , pairs );
233
236
metrics .incrSizeOfHFileRefsQueue (pairs .size ());
234
237
} else {
235
- LOG .debug ("HFiles will not be replicated belonging to the table " + tableName + " family "
236
- + Bytes .toString (family ) + " to peer id " + peerId );
238
+ LOG .debug ("HFiles will not be replicated belonging to the table {} family {} to peer id {}" ,
239
+ tableName , Bytes .toString (family ), peerId );
237
240
}
238
241
} else {
239
242
// user has explicitly not defined any table cfs for replication, means replicate all the
@@ -305,9 +308,14 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
305
308
ReplicationSourceShipper worker = createNewShipper (walGroupId , queue );
306
309
ReplicationSourceShipper extant = workerThreads .putIfAbsent (walGroupId , worker );
307
310
if (extant != null ) {
308
- LOG .debug ("Someone has beat us to start a worker thread for wal group {}" , walGroupId );
311
+ if (LOG .isDebugEnabled ()) {
312
+ LOG .debug ("{} Someone has beat us to start a worker thread for wal group {}" , logPeerId (),
313
+ walGroupId );
314
+ }
309
315
} else {
310
- LOG .debug ("Starting up worker for wal group {}" , walGroupId );
316
+ if (LOG .isDebugEnabled ()) {
317
+ LOG .debug ("{} Starting up worker for wal group {}" , logPeerId (), walGroupId );
318
+ }
311
319
ReplicationSourceWALReader walReader =
312
320
createNewWALReader (walGroupId , queue , worker .getStartPosition ());
313
321
Threads .setDaemonThreadRunning (walReader , Thread .currentThread ().getName () +
@@ -337,7 +345,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
337
345
}
338
346
} else {
339
347
currentPath = new Path ("NO_LOGS_IN_QUEUE" );
340
- LOG .warn ("No replication ongoing, waiting for new log" );
348
+ LOG .warn ("{} No replication ongoing, waiting for new log" , logPeerId () );
341
349
}
342
350
ReplicationStatus .ReplicationStatusBuilder statusBuilder = ReplicationStatus .newBuilder ();
343
351
statusBuilder .withPeerId (this .getPeerId ())
@@ -378,7 +386,8 @@ private ReplicationSourceWALReader createNewWALReader(String walGroupId,
378
386
379
387
protected final void uncaughtException (Thread t , Throwable e ) {
380
388
RSRpcServices .exitIfOOME (e );
381
- LOG .error ("Unexpected exception in " + t .getName () + " currentPath=" + getCurrentPath (), e );
389
+ LOG .error ("Unexpected exception in {} currentPath={}" ,
390
+ t .getName (), getCurrentPath (), e );
382
391
server .abort ("Unexpected exception in " + t .getName (), e );
383
392
}
384
393
@@ -399,7 +408,7 @@ public void tryThrottle(int batchSize) throws InterruptedException {
399
408
long sleepTicks = throttler .getNextSleepInterval (batchSize );
400
409
if (sleepTicks > 0 ) {
401
410
if (LOG .isTraceEnabled ()) {
402
- LOG .trace ("To sleep " + sleepTicks + " ms for throttling control" );
411
+ LOG .trace ("{} To sleep {} ms for throttling control" , logPeerId (), sleepTicks );
403
412
}
404
413
Thread .sleep (sleepTicks );
405
414
// reset throttler's cycle start tick when sleep for throttling occurs
@@ -433,11 +442,14 @@ private long getCurrentBandwidth() {
433
442
protected boolean sleepForRetries (String msg , int sleepMultiplier ) {
434
443
try {
435
444
if (LOG .isTraceEnabled ()) {
436
- LOG .trace (msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier );
445
+ LOG .trace ("{} {}, sleeping {} times {}" ,
446
+ logPeerId (), msg , sleepForRetries , sleepMultiplier );
437
447
}
438
448
Thread .sleep (this .sleepForRetries * sleepMultiplier );
439
449
} catch (InterruptedException e ) {
440
- LOG .debug ("Interrupted while sleeping between retries" );
450
+ if (LOG .isDebugEnabled ()) {
451
+ LOG .debug ("{} Interrupted while sleeping between retries" , logPeerId ());
452
+ }
441
453
Thread .currentThread ().interrupt ();
442
454
}
443
455
return sleepMultiplier < maxRetriesMultiplier ;
@@ -450,7 +462,7 @@ private void initialize() {
450
462
try {
451
463
replicationEndpoint = createReplicationEndpoint ();
452
464
} catch (Exception e ) {
453
- LOG .warn ("error creating ReplicationEndpoint, retry" , e );
465
+ LOG .warn ("{} error creating ReplicationEndpoint, retry" , logPeerId () , e );
454
466
if (sleepForRetries ("Error creating ReplicationEndpoint" , sleepMultiplier )) {
455
467
sleepMultiplier ++;
456
468
}
@@ -462,7 +474,7 @@ private void initialize() {
462
474
this .replicationEndpoint = replicationEndpoint ;
463
475
break ;
464
476
} catch (Exception e ) {
465
- LOG .warn ("Error starting ReplicationEndpoint, retry" , e );
477
+ LOG .warn ("{} Error starting ReplicationEndpoint, retry" , logPeerId () , e );
466
478
replicationEndpoint .stop ();
467
479
if (sleepForRetries ("Error starting ReplicationEndpoint" , sleepMultiplier )) {
468
480
sleepMultiplier ++;
@@ -480,8 +492,10 @@ private void initialize() {
480
492
for (;;) {
481
493
peerClusterId = replicationEndpoint .getPeerUUID ();
482
494
if (this .isSourceActive () && peerClusterId == null ) {
483
- LOG .debug ("Could not connect to Peer ZK. Sleeping for "
484
- + (this .sleepForRetries * sleepMultiplier ) + " millis." );
495
+ if (LOG .isDebugEnabled ()) {
496
+ LOG .debug ("{} Could not connect to Peer ZK. Sleeping for {} millis" , logPeerId (),
497
+ (this .sleepForRetries * sleepMultiplier ));
498
+ }
485
499
if (sleepForRetries ("Cannot contact the peer's zk ensemble" , sleepMultiplier )) {
486
500
sleepMultiplier ++;
487
501
}
@@ -499,8 +513,8 @@ private void initialize() {
499
513
this .manager .removeSource (this );
500
514
return ;
501
515
}
502
- LOG .info ("Source: {}, is now replicating from cluster: {}; to peer cluster: {};" ,
503
- this .replicationQueueInfo .getQueueId (), clusterId , peerClusterId );
516
+ LOG .info ("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};" ,
517
+ logPeerId (), this .replicationQueueInfo .getQueueId (), clusterId , peerClusterId );
504
518
505
519
initializeWALEntryFilter (peerClusterId );
506
520
// start workers
@@ -533,10 +547,10 @@ public void terminate(String reason, Exception cause) {
533
547
534
548
public void terminate (String reason , Exception cause , boolean join ) {
535
549
if (cause == null ) {
536
- LOG .info ("Closing source " + this . queueId + " because: " + reason );
550
+ LOG .info ("{} Closing source {} because: {}" , logPeerId (), this . queueId , reason );
537
551
} else {
538
- LOG .error ("Closing source " + this . queueId + " because an error occurred: " + reason ,
539
- cause );
552
+ LOG .error ("{} Closing source {} because an error occurred: {}" ,
553
+ logPeerId (), this . queueId , reason , cause );
540
554
}
541
555
this .sourceRunning = false ;
542
556
if (initThread != null && Thread .currentThread () != initThread ) {
@@ -560,7 +574,7 @@ public void terminate(String reason, Exception cause, boolean join) {
560
574
// Wait worker to stop
561
575
Thread .sleep (this .sleepForRetries );
562
576
} catch (InterruptedException e ) {
563
- LOG .info ("Interrupted while waiting " + worker .getName () + " to stop" );
577
+ LOG .info ("{} Interrupted while waiting {} to stop" , logPeerId (), worker .getName ());
564
578
Thread .currentThread ().interrupt ();
565
579
}
566
580
// If worker still is alive after waiting, interrupt it
@@ -581,15 +595,15 @@ public void terminate(String reason, Exception cause, boolean join) {
581
595
if (join ) {
582
596
for (ReplicationSourceShipper worker : workers ) {
583
597
Threads .shutdown (worker , this .sleepForRetries );
584
- LOG .info ("ReplicationSourceWorker " + worker .getName () + " terminated" );
598
+ LOG .info ("{} ReplicationSourceWorker {} terminated" , logPeerId (), worker .getName ());
585
599
}
586
600
if (this .replicationEndpoint != null ) {
587
601
try {
588
602
this .replicationEndpoint .awaitTerminated (sleepForRetries * maxRetriesMultiplier ,
589
603
TimeUnit .MILLISECONDS );
590
604
} catch (TimeoutException te ) {
591
- LOG .warn ("Got exception while waiting for endpoint to shutdown for replication source :" +
592
- this .queueId , te );
605
+ LOG .warn ("{} Got exception while waiting for endpoint to shutdown "
606
+ + "for replication source : {}" , logPeerId (), this .queueId , te );
593
607
}
594
608
}
595
609
}
@@ -721,4 +735,8 @@ ReplicationQueueStorage getQueueStorage() {
721
735
void removeWorker (ReplicationSourceShipper worker ) {
722
736
workerThreads .remove (worker .walGroupId , worker );
723
737
}
738
+
739
+ private String logPeerId (){
740
+ return "[Source for peer " + this .getPeer ().getId () + "]:" ;
741
+ }
724
742
}
0 commit comments