@@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
7979void FillDelete (ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
8080 FillDelete (partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
8181}
82- THeartbeatProcessor::THeartbeatProcessor (
83- const THashSet<TString>& sourceIdsWithHeartbeat,
84- const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
85- : SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
86- , SourceIdsByHeartbeat(sourceIdsByHeartbeat)
87- {
88- }
8982
9083void THeartbeatProcessor::ApplyHeartbeat (const TString& sourceId, const TRowVersion& version) {
9184 SourceIdsWithHeartbeat.insert (sourceId);
@@ -501,49 +494,101 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti
501494
502495// / THeartbeatEmitter
503496THeartbeatEmitter::THeartbeatEmitter (const TSourceIdStorage& storage)
504- : THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
505- , Storage(storage)
497+ : Storage(storage)
506498{
507499}
508500
509- void THeartbeatEmitter::Process (const TString& sourceId, const THeartbeat& heartbeat) {
510- Y_ABORT_UNLESS (Storage.InMemorySourceIds .contains (sourceId));
511- const auto & sourceIdInfo = Storage.InMemorySourceIds .at (sourceId);
501+ void THeartbeatEmitter::Process (const TString& sourceId, THeartbeat&& heartbeat) {
502+ auto it = Storage.InMemorySourceIds .find (sourceId);
503+ if (it != Storage.InMemorySourceIds .end () && it->second .LastHeartbeat ) {
504+ if (heartbeat.Version <= it->second .LastHeartbeat ->Version ) {
505+ return ;
506+ }
507+ }
512508
513- if (const auto & lastHeartbeat = sourceIdInfo. LastHeartbeat ) {
514- ForgetHeartbeat (sourceId, lastHeartbeat-> Version );
509+ if (!Storage. SourceIdsWithHeartbeat . contains (sourceId) ) {
510+ NewSourceIdsWithHeartbeat. insert (sourceId);
515511 }
516512
517- if (LastHeartbeats .contains (sourceId)) {
518- ForgetHeartbeat (sourceId, LastHeartbeats .at (sourceId).Version );
513+ if (Heartbeats .contains (sourceId)) {
514+ ForgetHeartbeat (sourceId, Heartbeats .at (sourceId).Version );
519515 }
520516
521517 ApplyHeartbeat (sourceId, heartbeat.Version );
522- LastHeartbeats [sourceId] = heartbeat;
518+ Heartbeats [sourceId] = std::move ( heartbeat) ;
523519}
524520
525521TMaybe<THeartbeat> THeartbeatEmitter::CanEmit () const {
526- if (SourceIdsWithHeartbeat. size () != Storage.ExplicitSourceIds .size ()) {
522+ if (Storage. ExplicitSourceIds . size () != ( Storage.SourceIdsWithHeartbeat .size () + NewSourceIdsWithHeartbeat. size () )) {
527523 return Nothing ();
528524 }
529525
530526 if (SourceIdsByHeartbeat.empty ()) {
531527 return Nothing ();
532528 }
533529
534- auto it = SourceIdsByHeartbeat.begin ();
535- if (Storage.SourceIdsByHeartbeat .empty () || it->first > Storage.SourceIdsByHeartbeat .begin ()->first ) {
536- Y_ABORT_UNLESS (!it->second .empty ());
537- const auto & someSourceId = *it->second .begin ();
530+ if (!NewSourceIdsWithHeartbeat.empty ()) { // just got quorum
531+ if (!Storage.SourceIdsByHeartbeat .empty () && Storage.SourceIdsByHeartbeat .begin ()->first < SourceIdsByHeartbeat.begin ()->first ) {
532+ return GetFromStorage (Storage.SourceIdsByHeartbeat .begin ());
533+ } else {
534+ return GetFromDiff (SourceIdsByHeartbeat.begin ());
535+ }
536+ } else if (SourceIdsByHeartbeat.begin ()->first > Storage.SourceIdsByHeartbeat .begin ()->first ) {
537+ auto storage = Storage.SourceIdsByHeartbeat .begin ();
538+ auto diff = SourceIdsByHeartbeat.begin ();
539+
540+ TMaybe<TRowVersion> newVersion;
541+ while (storage != Storage.SourceIdsByHeartbeat .end ()) {
542+ const auto & [version, sourceIds] = *storage;
543+
544+ auto rest = sourceIds.size ();
545+ for (const auto & sourceId : sourceIds) {
546+ auto it = Heartbeats.find (sourceId);
547+ if (it != Heartbeats.end () && it->second .Version > version && version <= diff->first ) {
548+ --rest;
549+ } else {
550+ break ;
551+ }
552+ }
538553
539- if (LastHeartbeats.contains (someSourceId)) {
540- return LastHeartbeats.at (someSourceId);
541- } else if (Storage.InMemorySourceIds .contains (someSourceId)) {
542- return Storage.InMemorySourceIds .at (someSourceId).LastHeartbeat ;
554+ if (!rest) {
555+ if (++storage != Storage.SourceIdsByHeartbeat .end ()) {
556+ newVersion = storage->first ;
557+ } else {
558+ newVersion = diff->first ;
559+ }
560+ } else {
561+ break ;
562+ }
563+ }
564+
565+ if (newVersion) {
566+ storage = Storage.SourceIdsByHeartbeat .find (*newVersion);
567+ if (storage != Storage.SourceIdsByHeartbeat .end ()) {
568+ return GetFromStorage (storage);
569+ } else {
570+ return GetFromDiff (diff);
571+ }
543572 }
544573 }
545574
546575 return Nothing ();
547576}
548577
578+ TMaybe<THeartbeat> THeartbeatEmitter::GetFromStorage (TSourceIdsByHeartbeat::const_iterator it) const {
579+ Y_ABORT_UNLESS (!it->second .empty ());
580+ const auto & someSourceId = *it->second .begin ();
581+
582+ Y_ABORT_UNLESS (Storage.InMemorySourceIds .contains (someSourceId));
583+ return Storage.InMemorySourceIds .at (someSourceId).LastHeartbeat ;
584+ }
585+
586+ TMaybe<THeartbeat> THeartbeatEmitter::GetFromDiff (TSourceIdsByHeartbeat::const_iterator it) const {
587+ Y_ABORT_UNLESS (!it->second .empty ());
588+ const auto & someSourceId = *it->second .begin ();
589+
590+ Y_ABORT_UNLESS (Heartbeats.contains (someSourceId));
591+ return Heartbeats.at (someSourceId);
592+ }
593+
549594}
0 commit comments