@@ -425,7 +425,7 @@ WALEntryFilter getWalEntryFilter() {
425
425
return walEntryFilter ;
426
426
}
427
427
428
- protected final void uncaughtException (Thread t , Throwable e , ReplicationSourceManager manager ,
428
+ private void uncaughtException (Thread t , Throwable e , ReplicationSourceManager manager ,
429
429
String peerId ) {
430
430
RSRpcServices .exitIfOOME (e );
431
431
LOG .error ("Unexpected exception in {} currentPath={}" , t .getName (), getCurrentPath (), e );
@@ -552,15 +552,10 @@ private void initialize() {
552
552
}
553
553
554
554
if (!this .isSourceActive ()) {
555
+ // this means the server is shutting down or the source is terminated, just give up
556
+ // initializing
555
557
setSourceStartupStatus (false );
556
- if (Thread .currentThread ().isInterrupted ()) {
557
- // If source is not running and thread is interrupted this means someone has tried to
558
- // remove this peer.
559
- return ;
560
- }
561
-
562
- retryStartup .set (!this .abortOnError );
563
- throw new IllegalStateException ("Source should be active." );
558
+ return ;
564
559
}
565
560
566
561
sleepMultiplier = 1 ;
@@ -582,15 +577,12 @@ private void initialize() {
582
577
}
583
578
584
579
if (!this .isSourceActive ()) {
580
+ // this means the server is shutting down or the source is terminated, just give up
581
+ // initializing
585
582
setSourceStartupStatus (false );
586
- if (Thread .currentThread ().isInterrupted ()) {
587
- // If source is not running and thread is interrupted this means someone has tried to
588
- // remove this peer.
589
- return ;
590
- }
591
- retryStartup .set (!this .abortOnError );
592
- throw new IllegalStateException ("Source should be active." );
583
+ return ;
593
584
}
585
+
594
586
LOG .info ("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}" , logPeerId (),
595
587
this .replicationQueueInfo .getQueueId (), logQueue .getNumQueues (), clusterId , peerClusterId );
596
588
initializeWALEntryFilter (peerClusterId );
0 commit comments