@@ -113,7 +113,9 @@ class TSessionInfo {
113113 ops->Send (ActorId, ev.Release ());
114114 }
115115
116- void SendWorkerDataEnd (IActorOps* ops, const TWorkerId& id, ui64 partitionId, const TVector<ui64>&& adjacentPartitionsIds, const TVector<ui64>&& childPartitionsIds) {
116+ void SendWorkerDataEnd (IActorOps* ops, const TWorkerId& id, ui64 partitionId,
117+ const TVector<ui64>&& adjacentPartitionsIds, const TVector<ui64>&& childPartitionsIds)
118+ {
117119 auto ev = MakeHolder<TEvService::TEvWorkerDataEnd>();
118120 auto & record = ev->Record ;
119121
@@ -129,18 +131,87 @@ class TSessionInfo {
129131 ops->Send (ActorId, ev.Release ());
130132 }
131133
134+ void Handle (IActorOps* ops, TEvService::TEvGetTxId::TPtr& ev) {
135+ TMap<TRowVersion, ui64> result;
136+ TVector<TRowVersion> versionsWithoutTxId;
137+
138+ for (const auto & v : ev->Get ()->Record .GetVersions ()) {
139+ const auto version = TRowVersion::FromProto (v);
140+ if (auto it = TxIds.upper_bound (version); it != TxIds.end ()) {
141+ result[it->first ] = it->second ;
142+ } else {
143+ versionsWithoutTxId.push_back (version);
144+ PendingTxId[version].insert (ev->Sender );
145+ }
146+ }
147+
148+ if (versionsWithoutTxId) {
149+ ops->Send (ActorId, new TEvService::TEvGetTxId (versionsWithoutTxId));
150+ }
151+
152+ if (result) {
153+ SendTxIdResult (ops, ev->Sender , result);
154+ }
155+ }
156+
157+ void Handle (IActorOps* ops, TEvService::TEvTxIdResult::TPtr& ev) {
158+ THashMap<TActorId, TMap<TRowVersion, ui64>> results;
159+
160+ for (const auto & kv : ev->Get ()->Record .GetVersionTxIds ()) {
161+ const auto version = TRowVersion::FromProto (kv.GetVersion ());
162+ TxIds.emplace (version, kv.GetTxId ());
163+
164+ for (auto it = PendingTxId.begin (); it != PendingTxId.end ();) {
165+ if (it->first >= version) {
166+ break ;
167+ }
168+
169+ for (const auto & actorId : it->second ) {
170+ results[actorId].emplace (version, kv.GetTxId ());
171+ }
172+
173+ PendingTxId.erase (it++);
174+ }
175+ }
176+
177+ for (const auto & [actorId, result] : results) {
178+ SendTxIdResult (ops, actorId, result);
179+ }
180+ }
181+
182+ void Handle (IActorOps* ops, TEvService::TEvHeartbeat::TPtr& ev) {
183+ GetWorkerId (ev->Sender ).Serialize (*ev->Get ()->Record .MutableWorker ());
184+ ops->Send (ActorId, ev->ReleaseBase ().Release (), ev->Flags , ev->Cookie );
185+ }
186+
132187 void Shutdown (IActorOps* ops) const {
133188 for (const auto & [_, actorId] : Workers) {
134189 ops->Send (actorId, new TEvents::TEvPoison ());
135190 }
136191 }
137192
193+ private:
194+ static void SendTxIdResult (IActorOps* ops, const TActorId& recipient, const TMap<TRowVersion, ui64>& result) {
195+ auto ev = MakeHolder<TEvService::TEvTxIdResult>();
196+
197+ for (const auto & [version, txId] : result) {
198+ auto & item = *ev->Record .AddVersionTxIds ();
199+ version.ToProto (item.MutableVersion ());
200+ item.SetTxId (txId);
201+ }
202+
203+ ops->Send (recipient, ev.Release ());
204+ }
205+
138206private:
139207 TActorId ActorId;
140208 ui64 Generation;
141209 THashMap<TWorkerId, TActorId> Workers;
142210 THashMap<TActorId, TWorkerId> ActorIdToWorkerId;
143211
212+ TMap<TRowVersion, ui64> TxIds;
213+ TMap<TRowVersion, THashSet<TActorId>> PendingTxId;
214+
144215}; // TSessionInfo
145216
146217struct TConnectionParams : std::tuple<TString, TString, bool , TString> {
@@ -365,22 +436,10 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
365436 session.StopWorker (this , id);
366437 }
367438
368- void SendTxIdResult (const TActorId& recipient, const TMap<TRowVersion, ui64>& result) {
369- auto ev = MakeHolder<TEvService::TEvTxIdResult>();
370-
371- for (const auto & [version, txId] : result) {
372- auto & item = *ev->Record .AddVersionTxIds ();
373- version.ToProto (item.MutableVersion ());
374- item.SetTxId (txId);
375- }
376-
377- Send (recipient, ev.Release ());
378- }
379-
380439 void Handle (TEvService::TEvGetTxId::TPtr& ev) {
381440 LOG_D (" Handle " << ev->Get ()->ToString ());
382441
383- const auto * session = SessionFromWorker (ev->Sender );
442+ auto * session = SessionFromWorker (ev->Sender );
384443 if (!session) {
385444 return ;
386445 }
@@ -391,26 +450,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
391450 return ;
392451 }
393452
394- TMap<TRowVersion, ui64> result;
395- TVector<TRowVersion> versionsWithoutTxId;
396-
397- for (const auto & v : ev->Get ()->Record .GetVersions ()) {
398- const auto version = TRowVersion::FromProto (v);
399- if (auto it = TxIds.upper_bound (version); it != TxIds.end ()) {
400- result[it->first ] = it->second ;
401- } else {
402- versionsWithoutTxId.push_back (version);
403- PendingTxId[version].insert (ev->Sender );
404- }
405- }
406-
407- if (versionsWithoutTxId) {
408- Send (*session, new TEvService::TEvGetTxId (versionsWithoutTxId));
409- }
410-
411- if (result) {
412- SendTxIdResult (ev->Sender , result);
413- }
453+ session->Handle (this , ev);
414454 }
415455
416456 void Handle (TEvService::TEvTxIdResult::TPtr& ev) {
@@ -436,34 +476,13 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
436476 return ;
437477 }
438478
439- THashMap<TActorId, TMap<TRowVersion, ui64>> results;
440-
441- for (const auto & kv : record.GetVersionTxIds ()) {
442- const auto version = TRowVersion::FromProto (kv.GetVersion ());
443- TxIds.emplace (version, kv.GetTxId ());
444-
445- for (auto it = PendingTxId.begin (); it != PendingTxId.end ();) {
446- if (it->first >= version) {
447- break ;
448- }
449-
450- for (const auto & actorId : it->second ) {
451- results[actorId].emplace (version, kv.GetTxId ());
452- }
453-
454- PendingTxId.erase (it++);
455- }
456- }
457-
458- for (const auto & [actorId, result] : results) {
459- SendTxIdResult (actorId, result);
460- }
479+ session.Handle (this , ev);
461480 }
462481
463482 void Handle (TEvService::TEvHeartbeat::TPtr& ev) {
464483 LOG_D (" Handle " << ev->Get ()->ToString ());
465484
466- const auto * session = SessionFromWorker (ev->Sender );
485+ auto * session = SessionFromWorker (ev->Sender );
467486 if (!session) {
468487 return ;
469488 }
@@ -474,14 +493,10 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
474493 return ;
475494 }
476495
477- auto & record = ev->Get ()->Record ;
478-
479496 LOG_I (" Heartbeat"
480497 << " : worker# " << ev->Sender
481- << " , version# " << TRowVersion::FromProto (record.GetVersion ()));
482-
483- session->GetWorkerId (ev->Sender ).Serialize (*record.MutableWorker ());
484- Send (ev->Forward (*session));
498+ << " , version# " << TRowVersion::FromProto (ev->Get ()->Record .GetVersion ()));
499+ session->Handle (this , ev);
485500 }
486501
487502 void Handle (TEvWorker::TEvDataEnd::TPtr& ev) {
@@ -608,8 +623,6 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
608623 THashMap<ui64, TSessionInfo> Sessions;
609624 THashMap<TConnectionParams, TActorId> YdbProxies;
610625 THashMap<TActorId, ui64> WorkerActorIdToSession;
611- TMap<TRowVersion, ui64> TxIds;
612- TMap<TRowVersion, THashSet<TActorId>> PendingTxId;
613626
614627}; // TReplicationService
615628
0 commit comments