39
39
import java .util .HashMap ;
40
40
import java .util .HashSet ;
41
41
import java .util .Map ;
42
+ import java .util .Objects ;
42
43
import java .util .OptionalLong ;
43
44
import java .util .Set ;
44
45
import java .util .function .Function ;
@@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
127
128
*/
128
129
final Map <String , CheckpointState > checkpoints ;
129
130
131
+ /**
132
+ * A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on
133
+ * the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances
134
+ * due to an update from the primary.
135
+ */
136
+ private final LongConsumer onGlobalCheckpointUpdated ;
137
+
130
138
/**
131
139
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
132
140
* current global checkpoint.
@@ -391,7 +399,8 @@ public ReplicationTracker(
391
399
final ShardId shardId ,
392
400
final String allocationId ,
393
401
final IndexSettings indexSettings ,
394
- final long globalCheckpoint ) {
402
+ final long globalCheckpoint ,
403
+ final LongConsumer onGlobalCheckpointUpdated ) {
395
404
super (shardId , indexSettings );
396
405
assert globalCheckpoint >= SequenceNumbers .UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint ;
397
406
this .shardAllocationId = allocationId ;
@@ -400,6 +409,7 @@ public ReplicationTracker(
400
409
this .appliedClusterStateVersion = -1L ;
401
410
this .checkpoints = new HashMap <>(1 + indexSettings .getNumberOfReplicas ());
402
411
checkpoints .put (allocationId , new CheckpointState (SequenceNumbers .UNASSIGNED_SEQ_NO , globalCheckpoint , false , false ));
412
+ this .onGlobalCheckpointUpdated = Objects .requireNonNull (onGlobalCheckpointUpdated );
403
413
this .pendingInSync = new HashSet <>();
404
414
this .routingTable = null ;
405
415
this .replicationGroup = null ;
@@ -456,7 +466,10 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp
456
466
updateGlobalCheckpoint (
457
467
shardAllocationId ,
458
468
globalCheckpoint ,
459
- current -> logger .trace ("updating global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason ));
469
+ current -> {
470
+ logger .trace ("updated global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason );
471
+ onGlobalCheckpointUpdated .accept (globalCheckpoint );
472
+ });
460
473
assert invariant ();
461
474
}
462
475
@@ -474,7 +487,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI
474
487
allocationId ,
475
488
globalCheckpoint ,
476
489
current -> logger .trace (
477
- "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
490
+ "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
478
491
allocationId ,
479
492
current ,
480
493
globalCheckpoint ));
@@ -485,8 +498,8 @@ private void updateGlobalCheckpoint(final String allocationId, final long global
485
498
final CheckpointState cps = checkpoints .get (allocationId );
486
499
assert !this .shardAllocationId .equals (allocationId ) || cps != null ;
487
500
if (cps != null && globalCheckpoint > cps .globalCheckpoint ) {
488
- ifUpdated .accept (cps .globalCheckpoint );
489
501
cps .globalCheckpoint = globalCheckpoint ;
502
+ ifUpdated .accept (cps .globalCheckpoint );
490
503
}
491
504
}
492
505
@@ -737,8 +750,9 @@ private synchronized void updateGlobalCheckpointOnPrimary() {
737
750
assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint +
738
751
"] is lower than previous one [" + globalCheckpoint + "]" ;
739
752
if (globalCheckpoint != computedGlobalCheckpoint ) {
740
- logger .trace ("global checkpoint updated to [{}]" , computedGlobalCheckpoint );
741
753
cps .globalCheckpoint = computedGlobalCheckpoint ;
754
+ logger .trace ("updated global checkpoint to [{}]" , computedGlobalCheckpoint );
755
+ onGlobalCheckpointUpdated .accept (computedGlobalCheckpoint );
742
756
}
743
757
}
744
758
0 commit comments