18
18
*/
19
19
package org .apache .hadoop .hbase .replication .regionserver ;
20
20
21
+ import com .google .common .annotations .VisibleForTesting ;
21
22
import com .google .common .collect .Lists ;
22
23
import com .google .common .util .concurrent .ListenableFuture ;
23
24
import com .google .common .util .concurrent .Service ;
@@ -439,14 +440,30 @@ public String getPeerClusterId() {
439
440
}
440
441
441
442
@ Override
443
+ @ VisibleForTesting
442
444
public Path getCurrentPath () {
443
- // only for testing
444
445
for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
445
446
if (worker .getCurrentPath () != null ) return worker .getCurrentPath ();
446
447
}
447
448
return null ;
448
449
}
449
450
451
+ @ VisibleForTesting
452
+ public Path getLastLoggedPath () {
453
+ for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
454
+ return worker .getLastLoggedPath ();
455
+ }
456
+ return null ;
457
+ }
458
+
459
+ @ VisibleForTesting
460
+ public long getLastLoggedPosition () {
461
+ for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
462
+ return worker .getLastLoggedPosition ();
463
+ }
464
+ return 0 ;
465
+ }
466
+
450
467
private boolean isSourceActive () {
451
468
return !this .stopper .isStopped () && this .sourceRunning ;
452
469
}
@@ -481,8 +498,8 @@ public String getStats() {
481
498
for (Map .Entry <String , ReplicationSourceShipperThread > entry : workerThreads .entrySet ()) {
482
499
String walGroupId = entry .getKey ();
483
500
ReplicationSourceShipperThread worker = entry .getValue ();
484
- long position = worker .getCurrentPosition ();
485
- Path currentPath = worker .getCurrentPath ();
501
+ long position = worker .getLastLoggedPosition ();
502
+ Path currentPath = worker .getLastLoggedPath ();
486
503
sb .append ("walGroup [" ).append (walGroupId ).append ("]: " );
487
504
if (currentPath != null ) {
488
505
sb .append ("currently replicating from: " ).append (currentPath ).append (" at position: " )
@@ -517,7 +534,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
517
534
int queueSize = queues .get (walGroupId ).size ();
518
535
replicationDelay =
519
536
ReplicationLoad .calculateReplicationDelay (ageOfLastShippedOp , lastTimeStamp , queueSize );
520
- Path currentPath = worker .getCurrentPath ();
537
+ Path currentPath = worker .getLastLoggedPath ();
521
538
fileSize = -1 ;
522
539
if (currentPath != null ) {
523
540
try {
@@ -535,7 +552,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
535
552
.withQueueSize (queueSize )
536
553
.withWalGroup (walGroupId )
537
554
.withCurrentPath (currentPath )
538
- .withCurrentPosition (worker .getCurrentPosition ())
555
+ .withCurrentPosition (worker .getLastLoggedPosition ())
539
556
.withFileSize (fileSize )
540
557
.withAgeOfLastShippedOp (ageOfLastShippedOp )
541
558
.withReplicationDelay (replicationDelay );
@@ -555,7 +572,7 @@ public class ReplicationSourceShipperThread extends Thread {
555
572
// Last position in the log that we sent to ZooKeeper
556
573
private long lastLoggedPosition = -1 ;
557
574
// Path of the current log
558
- private volatile Path currentPath ;
575
+ private volatile Path lastLoggedPath ;
559
576
// Current state of the worker thread
560
577
private WorkerState state ;
561
578
ReplicationSourceWALReaderThread entryReader ;
@@ -600,21 +617,19 @@ public void run() {
600
617
WALEntryBatch entryBatch = entryReader .take ();
601
618
shipEdits (entryBatch );
602
619
releaseBufferQuota ((int ) entryBatch .getHeapSize ());
603
- if (replicationQueueInfo .isQueueRecovered () && entryBatch .getWalEntries ().isEmpty ()
604
- && entryBatch .getLastSeqIds ().isEmpty ()) {
605
- LOG .debug ("Finished recovering queue for group " + walGroupId + " of peer "
606
- + peerClusterZnode );
620
+ if (!entryBatch .hasMoreEntries ()) {
621
+ LOG .debug ("Finished recovering queue for group "
622
+ + walGroupId + " of peer " + peerClusterZnode );
607
623
metrics .incrCompletedRecoveryQueue ();
608
624
setWorkerState (WorkerState .FINISHED );
609
- continue ;
610
625
}
611
626
} catch (InterruptedException e ) {
612
627
LOG .trace ("Interrupted while waiting for next replication entry batch" , e );
613
628
Thread .currentThread ().interrupt ();
614
629
}
615
630
}
616
631
617
- if (replicationQueueInfo . isQueueRecovered () && getWorkerState () == WorkerState .FINISHED ) {
632
+ if (getWorkerState () == WorkerState .FINISHED ) {
618
633
// use synchronize to make sure one last thread will clean the queue
619
634
synchronized (this ) {
620
635
Threads .sleep (100 );// wait a short while for other worker thread to fully exit
@@ -694,15 +709,13 @@ private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
694
709
protected void shipEdits (WALEntryBatch entryBatch ) {
695
710
List <Entry > entries = entryBatch .getWalEntries ();
696
711
long lastReadPosition = entryBatch .getLastWalPosition ();
697
- currentPath = entryBatch .getLastWalPath ();
712
+ lastLoggedPath = entryBatch .getLastWalPath ();
698
713
int sleepMultiplier = 0 ;
699
714
if (entries .isEmpty ()) {
700
- if (lastLoggedPosition != lastReadPosition ) {
701
- updateLogPosition (lastReadPosition );
702
- // if there was nothing to ship and it's not an error
703
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
704
- metrics .setAgeOfLastShippedOp (EnvironmentEdgeManager .currentTime (), walGroupId );
705
- }
715
+ updateLogPosition (lastReadPosition );
716
+ // if there was nothing to ship and it's not an error
717
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
718
+ metrics .setAgeOfLastShippedOp (EnvironmentEdgeManager .currentTime (), walGroupId );
706
719
return ;
707
720
}
708
721
int currentSize = (int ) entryBatch .getHeapSize ();
@@ -787,8 +800,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
787
800
}
788
801
789
802
private void updateLogPosition (long lastReadPosition ) {
790
- manager .setPendingShipment (false );
791
- manager .logPositionAndCleanOldLogs (currentPath , peerClusterZnode , lastReadPosition ,
803
+ manager .logPositionAndCleanOldLogs (lastLoggedPath , peerClusterZnode , lastReadPosition ,
792
804
this .replicationQueueInfo .isQueueRecovered (), false );
793
805
lastLoggedPosition = lastReadPosition ;
794
806
}
@@ -800,7 +812,7 @@ public void startup() {
800
812
public void uncaughtException (final Thread t , final Throwable e ) {
801
813
RSRpcServices .exitIfOOME (e );
802
814
LOG .error ("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
803
- + getCurrentPath (), e );
815
+ + getLastLoggedPath (), e );
804
816
stopper .stop ("Unexpected exception in ReplicationSourceWorkerThread" );
805
817
}
806
818
};
@@ -941,8 +953,12 @@ public Path getCurrentPath() {
941
953
return this .entryReader .getCurrentPath ();
942
954
}
943
955
944
- public long getCurrentPosition () {
945
- return this .lastLoggedPosition ;
956
+ public Path getLastLoggedPath () {
957
+ return lastLoggedPath ;
958
+ }
959
+
960
+ public long getLastLoggedPosition () {
961
+ return lastLoggedPosition ;
946
962
}
947
963
948
964
private boolean isWorkerActive () {
0 commit comments