@@ -404,38 +404,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
404
404
// TODO: use empty initial offsets for now, revisit when adding support for sync replication
405
405
ReplicationSourceInterface src =
406
406
createSource (new ReplicationQueueData (queueId , ImmutableMap .of ()), peer );
407
- // synchronized here to avoid race with preLogRoll where we add new log to source and also
407
+ // synchronized here to avoid race with postLogRoll where we add new log to source and also
408
408
// walsById.
409
409
ReplicationSourceInterface toRemove ;
410
- Map < String , NavigableSet < String >> wals = new HashMap <>() ;
410
+ ReplicationQueueData queueData ;
411
411
synchronized (latestPaths ) {
412
+ // Here we make a copy of all the remaining wal files and then delete them from the
413
+ // replication queue storage after releasing the lock. It is not safe to just remove the old
414
+ // map from walsById since later we may fail to update the replication queue storage, and when
415
+ // we retry next time, we can not know the wal files that needs to be set to the replication
416
+ // queue storage
417
+ ImmutableMap .Builder <String , ReplicationGroupOffset > builder = ImmutableMap .builder ();
418
+ synchronized (walsById ) {
419
+ walsById .get (queueId ).forEach ((group , wals ) -> {
420
+ if (!wals .isEmpty ()) {
421
+ builder .put (group , new ReplicationGroupOffset (wals .last (), -1 ));
422
+ }
423
+ });
424
+ }
425
+ queueData = new ReplicationQueueData (queueId , builder .build ());
426
+ src = createSource (queueData , peer );
412
427
toRemove = sources .put (peerId , src );
413
428
if (toRemove != null ) {
414
429
LOG .info ("Terminate replication source for " + toRemove .getPeerId ());
415
430
toRemove .terminate (terminateMessage );
416
431
toRemove .getSourceMetrics ().clear ();
417
432
}
418
- // Here we make a copy of all the remaining wal files and then delete them from the
419
- // replication queue storage after releasing the lock. It is not safe to just remove the old
420
- // map from walsById since later we may fail to delete them from the replication queue
421
- // storage, and when we retry next time, we can not know the wal files that need to be deleted
422
- // from the replication queue storage.
423
- walsById .get (queueId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
433
+ }
434
+ for (Map .Entry <String , ReplicationGroupOffset > entry : queueData .getOffsets ().entrySet ()) {
435
+ queueStorage .setOffset (queueId , entry .getKey (), entry .getValue (), Collections .emptyMap ());
424
436
}
425
437
LOG .info ("Startup replication source for " + src .getPeerId ());
426
438
src .startup ();
427
- for (NavigableSet <String > walsByGroup : wals .values ()) {
428
- // TODO: just need to reset the replication offset
429
- // for (String wal : walsByGroup) {
430
- // queueStorage.removeWAL(server.getServerName(), peerId, wal);
431
- // }
432
- }
433
439
synchronized (walsById ) {
434
- Map <String , NavigableSet <String >> oldWals = walsById .get (queueId );
435
- wals . forEach ((k , v ) -> {
436
- NavigableSet <String > walsByGroup = oldWals .get (k );
440
+ Map <String , NavigableSet <String >> wals = walsById .get (queueId );
441
+ queueData . getOffsets (). forEach ((group , offset ) -> {
442
+ NavigableSet <String > walsByGroup = wals .get (group );
437
443
if (walsByGroup != null ) {
438
- walsByGroup .removeAll ( v );
444
+ walsByGroup .headSet ( offset . getWal (), true ). clear ( );
439
445
}
440
446
});
441
447
}
@@ -458,13 +464,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
458
464
}
459
465
460
466
private ReplicationSourceInterface createRefreshedSource (ReplicationQueueId queueId ,
461
- ReplicationPeer peer ) throws IOException {
462
- Map <String , ReplicationGroupOffset > offsets ;
463
- try {
464
- offsets = queueStorage .getOffsets (queueId );
465
- } catch (ReplicationException e ) {
466
- throw new IOException (e );
467
- }
467
+ ReplicationPeer peer ) throws IOException , ReplicationException {
468
+ Map <String , ReplicationGroupOffset > offsets = queueStorage .getOffsets (queueId );
468
469
return createSource (new ReplicationQueueData (queueId , ImmutableMap .copyOf (offsets )), peer );
469
470
}
470
471
@@ -474,7 +475,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
474
475
* replication queue storage and only to enqueue all logs to the new replication source
475
476
* @param peerId the id of the replication peer
476
477
*/
477
- public void refreshSources (String peerId ) throws IOException {
478
+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
478
479
String terminateMessage = "Peer " + peerId
479
480
+ " state or config changed. Will close the previous replication source and open a new one" ;
480
481
ReplicationPeer peer = replicationPeers .getPeer (peerId );
0 commit comments