@@ -45,21 +45,51 @@ class TSessionInfo {
4545 return Workers.contains (id);
4646 }
4747
48- void RegisterWorker (IActorOps* ops, const TWorkerId& id, IActor* actor) {
48+ bool HasWorker (const TActorId& id) const {
49+ return ActorIdToWorkerId.contains (id);
50+ }
51+
52+ TActorId GetWorkerActorId (const TWorkerId& id) const {
53+ auto it = Workers.find (id);
54+ Y_ABORT_UNLESS (it != Workers.end ());
55+ return it->second ;
56+ }
57+
58+ TActorId RegisterWorker (IActorOps* ops, const TWorkerId& id, IActor* actor) {
4959 auto res = Workers.emplace (id, ops->Register (actor));
5060 Y_ABORT_UNLESS (res.second );
5161
52- ops->Send (ActorId, new TEvService::TEvWorkerStatus (id, NKikimrReplication::TEvWorkerStatus::RUNNING));
62+ const auto actorId = res.first ->second ;
63+ ActorIdToWorkerId.emplace (actorId, id);
64+
65+ SendWorkerStatus (ops, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
66+ return actorId;
5367 }
5468
5569 void StopWorker (IActorOps* ops, const TWorkerId& id) {
5670 auto it = Workers.find (id);
5771 Y_ABORT_UNLESS (it != Workers.end ());
5872
5973 ops->Send (it->second , new TEvents::TEvPoison ());
74+ SendWorkerStatus (ops, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
75+
76+ ActorIdToWorkerId.erase (it->second );
6077 Workers.erase (it);
78+ }
79+
80+ void StopWorker (IActorOps* ops, const TActorId& id) {
81+ auto it = ActorIdToWorkerId.find (id);
82+ Y_ABORT_UNLESS (it != ActorIdToWorkerId.end ());
83+
84+ // actor already stopped
85+ SendWorkerStatus (ops, it->second , NKikimrReplication::TEvWorkerStatus::STOPPED);
86+
87+ Workers.erase (it->second );
88+ ActorIdToWorkerId.erase (it);
89+ }
6190
62- ops->Send (ActorId, new TEvService::TEvWorkerStatus (id, NKikimrReplication::TEvWorkerStatus::STOPPED));
91+ void SendWorkerStatus (IActorOps* ops, const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) {
92+ ops->Send (ActorId, new TEvService::TEvWorkerStatus (id, status));
6393 }
6494
6595 void SendStatus (IActorOps* ops) const {
@@ -83,6 +113,7 @@ class TSessionInfo {
83113 TActorId ActorId;
84114 ui64 Generation;
85115 THashMap<TWorkerId, TActorId> Workers;
116+ THashMap<TActorId, TWorkerId> ActorIdToWorkerId;
86117
87118}; // TSessionInfo
88119
@@ -243,7 +274,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
243274 }
244275
245276 if (session.HasWorker (id)) {
246- return ;
277+ return session. SendWorkerStatus ( this , id, NKikimrReplication::TEvWorkerStatus::RUNNING) ;
247278 }
248279
249280 LOG_I (" Run worker"
@@ -253,7 +284,9 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
253284 // TODO: validate settings
254285 const auto & readerSettings = cmd.GetRemoteTopicReader ();
255286 const auto & writerSettings = cmd.GetLocalTableWriter ();
256- session.RegisterWorker (this , id, CreateWorker (ReaderFn (readerSettings), WriterFn (writerSettings)));
287+ const auto actorId = session.RegisterWorker (this , id,
288+ CreateWorker (SelfId (), ReaderFn (readerSettings), WriterFn (writerSettings)));
289+ WorkerActorIdToSession[actorId] = controller.GetTabletId ();
257290 }
258291
259292 void Handle (TEvService::TEvStopWorker::TPtr& ev) {
@@ -282,11 +315,46 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
282315 return ;
283316 }
284317
285- if (session.HasWorker (id)) {
286- LOG_I (" Stop worker"
287- << " : worker# " << id);
288- session.StopWorker (this , id);
318+ if (!session.HasWorker (id)) {
319+ return session.SendWorkerStatus (this , id, NKikimrReplication::TEvWorkerStatus::STOPPED);
289320 }
321+
322+ LOG_I (" Stop worker"
323+ << " : worker# " << id);
324+ WorkerActorIdToSession.erase (session.GetWorkerActorId (id));
325+ session.StopWorker (this , id);
326+ }
327+
328+ void Handle (TEvWorker::TEvGone::TPtr& ev) {
329+ LOG_T (" Handle " << ev->Get ()->ToString ());
330+
331+ auto wit = WorkerActorIdToSession.find (ev->Sender );
332+ if (wit == WorkerActorIdToSession.end ()) {
333+ LOG_W (" Unknown worker has gone"
334+ << " : worker# " << ev->Sender );
335+ return ;
336+ }
337+
338+ auto it = Sessions.find (wit->second );
339+ if (it == Sessions.end ()) {
340+ LOG_E (" Cannot find session"
341+ << " : worker# " << ev->Sender
342+ << " , session# " << wit->second );
343+ return ;
344+ }
345+
346+ auto & session = it->second ;
347+ if (!session.HasWorker (ev->Sender )) {
348+ LOG_E (" Cannot find worker"
349+ << " : worker# " << ev->Sender
350+ << " , session# " << wit->second );
351+ return ;
352+ }
353+
354+ LOG_I (" Worker has gone"
355+ << " : worker# " << ev->Sender );
356+ WorkerActorIdToSession.erase (ev->Sender );
357+ session.StopWorker (this , ev->Sender );
290358 }
291359
292360 void PassAway () override {
@@ -320,6 +388,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
320388 hFunc (TEvService::TEvHandshake, Handle);
321389 hFunc (TEvService::TEvRunWorker, Handle);
322390 hFunc (TEvService::TEvStopWorker, Handle);
391+ hFunc (TEvWorker::TEvGone, Handle);
323392 sFunc (TEvents::TEvPoison, PassAway);
324393 }
325394 }
@@ -329,6 +398,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
329398 TActorId BoardPublisher;
330399 THashMap<ui64, TSessionInfo> Sessions;
331400 THashMap<TCredentialsKey, TActorId> YdbProxies;
401+ THashMap<TActorId, ui64> WorkerActorIdToSession;
332402
333403}; // TReplicationService
334404
0 commit comments