@@ -403,38 +403,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
403
403
// TODO: use empty initial offsets for now, revisit when adding support for sync replication
404
404
ReplicationSourceInterface src =
405
405
createSource (new ReplicationQueueData (queueId , ImmutableMap .of ()), peer );
406
- // synchronized here to avoid race with preLogRoll where we add new log to source and also
406
+ // synchronized here to avoid race with postLogRoll where we add new log to source and also
407
407
// walsById.
408
408
ReplicationSourceInterface toRemove ;
409
- Map < String , NavigableSet < String >> wals = new HashMap <>() ;
409
+ ReplicationQueueData queueData ;
410
410
synchronized (latestPaths ) {
411
+ // Here we make a copy of all the remaining wal files and then delete them from the
412
+ // replication queue storage after releasing the lock. It is not safe to just remove the old
413
+ // map from walsById since later we may fail to update the replication queue storage, and when
414
+ // we retry next time, we can not know the wal files that needs to be set to the replication
415
+ // queue storage
416
+ ImmutableMap .Builder <String , ReplicationGroupOffset > builder = ImmutableMap .builder ();
417
+ synchronized (walsById ) {
418
+ walsById .get (queueId ).forEach ((group , wals ) -> {
419
+ if (!wals .isEmpty ()) {
420
+ builder .put (group , new ReplicationGroupOffset (wals .last (), -1 ));
421
+ }
422
+ });
423
+ }
424
+ queueData = new ReplicationQueueData (queueId , builder .build ());
425
+ src = createSource (queueData , peer );
411
426
toRemove = sources .put (peerId , src );
412
427
if (toRemove != null ) {
413
428
LOG .info ("Terminate replication source for " + toRemove .getPeerId ());
414
429
toRemove .terminate (terminateMessage );
415
430
toRemove .getSourceMetrics ().clear ();
416
431
}
417
- // Here we make a copy of all the remaining wal files and then delete them from the
418
- // replication queue storage after releasing the lock. It is not safe to just remove the old
419
- // map from walsById since later we may fail to delete them from the replication queue
420
- // storage, and when we retry next time, we can not know the wal files that need to be deleted
421
- // from the replication queue storage.
422
- walsById .get (queueId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
432
+ }
433
+ for (Map .Entry <String , ReplicationGroupOffset > entry : queueData .getOffsets ().entrySet ()) {
434
+ queueStorage .setOffset (queueId , entry .getKey (), entry .getValue (), Collections .emptyMap ());
423
435
}
424
436
LOG .info ("Startup replication source for " + src .getPeerId ());
425
437
src .startup ();
426
- for (NavigableSet <String > walsByGroup : wals .values ()) {
427
- // TODO: just need to reset the replication offset
428
- // for (String wal : walsByGroup) {
429
- // queueStorage.removeWAL(server.getServerName(), peerId, wal);
430
- // }
431
- }
432
438
synchronized (walsById ) {
433
- Map <String , NavigableSet <String >> oldWals = walsById .get (queueId );
434
- wals . forEach ((k , v ) -> {
435
- NavigableSet <String > walsByGroup = oldWals .get (k );
439
+ Map <String , NavigableSet <String >> wals = walsById .get (queueId );
440
+ queueData . getOffsets (). forEach ((group , offset ) -> {
441
+ NavigableSet <String > walsByGroup = wals .get (group );
436
442
if (walsByGroup != null ) {
437
- walsByGroup .removeAll ( v );
443
+ walsByGroup .headSet ( offset . getWal (), true ). clear ( );
438
444
}
439
445
});
440
446
}
@@ -457,13 +463,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
457
463
}
458
464
459
465
private ReplicationSourceInterface createRefreshedSource (ReplicationQueueId queueId ,
460
- ReplicationPeer peer ) throws IOException {
461
- Map <String , ReplicationGroupOffset > offsets ;
462
- try {
463
- offsets = queueStorage .getOffsets (queueId );
464
- } catch (ReplicationException e ) {
465
- throw new IOException (e );
466
- }
466
+ ReplicationPeer peer ) throws IOException , ReplicationException {
467
+ Map <String , ReplicationGroupOffset > offsets = queueStorage .getOffsets (queueId );
467
468
return createSource (new ReplicationQueueData (queueId , ImmutableMap .copyOf (offsets )), peer );
468
469
}
469
470
@@ -473,7 +474,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
473
474
* replication queue storage and only to enqueue all logs to the new replication source
474
475
* @param peerId the id of the replication peer
475
476
*/
476
- public void refreshSources (String peerId ) throws IOException {
477
+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
477
478
String terminateMessage = "Peer " + peerId
478
479
+ " state or config changed. Will close the previous replication source and open a new one" ;
479
480
ReplicationPeer peer = replicationPeers .getPeer (peerId );
0 commit comments