@@ -530,55 +530,60 @@ void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat)
530530
531531TMaybe<THeartbeat> THeartbeatEmitter::CanEmit () const {
532532 if (Storage.ExplicitSourceIds .size () != (Storage.SourceIdsWithHeartbeat .size () + NewSourceIdsWithHeartbeat.size ())) {
533+ // there is no quorum
533534 return Nothing ();
534535 }
535536
536537 if (SourceIdsByHeartbeat.empty ()) {
538+ // there is no new heartbeats, nothing to emit
537539 return Nothing ();
538540 }
539541
540- if (!NewSourceIdsWithHeartbeat.empty ()) { // just got quorum
541- if (!Storage.SourceIdsByHeartbeat .empty () && Storage.SourceIdsByHeartbeat .begin ()->first < SourceIdsByHeartbeat.begin ()->first ) {
542+ if (Storage.SourceIdsByHeartbeat .empty ()) {
543+ // got quorum, memory state
544+ return GetFromDiff (SourceIdsByHeartbeat.begin ());
545+ }
546+
547+ if (!NewSourceIdsWithHeartbeat.empty ()) {
548+ // got quorum, mixed state
549+ if (Storage.SourceIdsByHeartbeat .begin ()->first < SourceIdsByHeartbeat.begin ()->first ) {
542550 return GetFromStorage (Storage.SourceIdsByHeartbeat .begin ());
543551 } else {
544552 return GetFromDiff (SourceIdsByHeartbeat.begin ());
545553 }
546- } else if (SourceIdsByHeartbeat.begin ()->first > Storage.SourceIdsByHeartbeat .begin ()->first ) {
547- auto storage = Storage.SourceIdsByHeartbeat .begin ();
548- auto diff = SourceIdsByHeartbeat.begin ();
549-
550- TMaybe<TRowVersion> newVersion;
551- while (storage != Storage.SourceIdsByHeartbeat .end ()) {
552- const auto & [version, sourceIds] = *storage;
553-
554- auto rest = sourceIds.size ();
555- for (const auto & sourceId : sourceIds) {
556- auto it = Heartbeats.find (sourceId);
557- if (it != Heartbeats.end () && it->second .Version > version && version <= diff->first ) {
558- --rest;
559- } else {
560- break ;
561- }
562- }
554+ }
563555
564- if (!rest) {
565- if (++storage != Storage.SourceIdsByHeartbeat .end ()) {
566- newVersion = storage->first ;
567- } else {
568- newVersion = diff->first ;
569- }
556+ TMaybe<TRowVersion> emitVersion;
557+
558+ for (auto it = Storage.SourceIdsByHeartbeat .begin (), end = Storage.SourceIdsByHeartbeat .end (); it != end; ++it) {
559+ const auto & [version, sourceIds] = *it;
560+ auto rest = sourceIds.size ();
561+
562+ for (const auto & sourceId : sourceIds) {
563+ if (Heartbeats.contains (sourceId) && Heartbeats.at (sourceId).Version > version) {
564+ --rest;
570565 } else {
571566 break ;
572567 }
573568 }
574569
575- if (newVersion) {
576- storage = Storage.SourceIdsByHeartbeat .find (*newVersion);
577- if (storage != Storage.SourceIdsByHeartbeat .end ()) {
578- return GetFromStorage (storage);
579- } else {
580- return GetFromDiff (diff);
581- }
570+ if (rest) {
571+ break ;
572+ }
573+
574+ if (auto next = std::next (it); next != end && next->first < SourceIdsByHeartbeat.begin ()->first ) {
575+ emitVersion = next->first ;
576+ } else {
577+ emitVersion = SourceIdsByHeartbeat.begin ()->first ;
578+ break ;
579+ }
580+ }
581+
582+ if (emitVersion) {
583+ if (auto it = Storage.SourceIdsByHeartbeat .find (*emitVersion); it != Storage.SourceIdsByHeartbeat .end ()) {
584+ return GetFromStorage (it);
585+ } else {
586+ return GetFromDiff (SourceIdsByHeartbeat.begin ());
582587 }
583588 }
584589
0 commit comments