Skip to content

Commit 9f030a8

Browse files
authored
Merge 8300489 into 83145b0
2 parents 83145b0 + 8300489 commit 9f030a8

File tree

8 files changed

+110
-18
lines changed

8 files changed

+110
-18
lines changed

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,9 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
344344
}
345345
break;
346346
case NKikimrReplication::TEvWorkerStatus::STOPPED:
347-
MaybeRemoveWorker(id, ctx);
347+
if (!MaybeRemoveWorker(id, ctx)) {
348+
BootQueue.insert(id);
349+
}
348350
break;
349351
default:
350352
CLOG_W(ctx, "Unknown worker status"
@@ -589,10 +591,13 @@ void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
589591
target->Progress(ctx);
590592
}
591593

592-
void TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
593-
if (RemoveQueue.contains(id)) {
594-
RemoveWorker(id, ctx);
594+
bool TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
595+
if (!RemoveQueue.contains(id)) {
596+
return false;
595597
}
598+
599+
RemoveWorker(id, ctx);
600+
return true;
596601
}
597602

598603
void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class TController
100100
void BootWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd);
101101
void StopWorker(ui32 nodeId, const TWorkerId& id);
102102
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
103-
void MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
103+
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
104104

105105
// local transactions
106106
class TTxInitSchema;

ydb/core/tx/replication/service/service.cpp

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,47 @@ class TSessionInfo {
4545
return Workers.contains(id);
4646
}
4747

48-
void RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) {
48+
bool HasWorker(const TActorId& id) const {
49+
return ActorIdToWorkerId.contains(id);
50+
}
51+
52+
TActorId GetWorkerActorId(const TWorkerId& id) const {
53+
auto it = Workers.find(id);
54+
Y_ABORT_UNLESS(it != Workers.end());
55+
return it->second;
56+
}
57+
58+
TActorId RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) {
4959
auto res = Workers.emplace(id, ops->Register(actor));
5060
Y_ABORT_UNLESS(res.second);
5161

62+
const auto actorId = res.first->second;
63+
ActorIdToWorkerId.emplace(actorId, id);
64+
5265
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::RUNNING));
66+
return actorId;
5367
}
5468

5569
void StopWorker(IActorOps* ops, const TWorkerId& id) {
5670
auto it = Workers.find(id);
5771
Y_ABORT_UNLESS(it != Workers.end());
5872

5973
ops->Send(it->second, new TEvents::TEvPoison());
74+
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::STOPPED));
75+
76+
ActorIdToWorkerId.erase(it->second);
6077
Workers.erase(it);
78+
}
6179

62-
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::STOPPED));
80+
void StopWorker(IActorOps* ops, const TActorId& id) {
81+
auto it = ActorIdToWorkerId.find(id);
82+
Y_ABORT_UNLESS(it != ActorIdToWorkerId.end());
83+
84+
// actor already stopped
85+
ops->Send(ActorId, new TEvService::TEvWorkerStatus(it->second, NKikimrReplication::TEvWorkerStatus::STOPPED));
86+
87+
Workers.erase(it->second);
88+
ActorIdToWorkerId.erase(it);
6389
}
6490

6591
void SendStatus(IActorOps* ops) const {
@@ -83,6 +109,7 @@ class TSessionInfo {
83109
TActorId ActorId;
84110
ui64 Generation;
85111
THashMap<TWorkerId, TActorId> Workers;
112+
THashMap<TActorId, TWorkerId> ActorIdToWorkerId;
86113

87114
}; // TSessionInfo
88115

@@ -252,7 +279,9 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
252279
// TODO: validate settings
253280
const auto& readerSettings = cmd.GetRemoteTopicReader();
254281
const auto& writerSettings = cmd.GetLocalTableWriter();
255-
session.RegisterWorker(this, id, CreateWorker(ReaderFn(readerSettings), WriterFn(writerSettings)));
282+
const auto actorId = session.RegisterWorker(this, id,
283+
CreateWorker(SelfId(), ReaderFn(readerSettings), WriterFn(writerSettings)));
284+
WorkerActorIdToSession[actorId] = controller.GetTabletId();
256285
}
257286

258287
void Handle(TEvService::TEvStopWorker::TPtr& ev) {
@@ -284,10 +313,43 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
284313
if (session.HasWorker(id)) {
285314
LOG_I("Stop worker"
286315
<< ": worker# " << id);
316+
WorkerActorIdToSession.erase(session.GetWorkerActorId(id));
287317
session.StopWorker(this, id);
288318
}
289319
}
290320

321+
void Handle(TEvWorker::TEvGone::TPtr& ev) {
322+
LOG_T("Handle " << ev->Get()->ToString());
323+
324+
auto wit = WorkerActorIdToSession.find(ev->Sender);
325+
if (wit == WorkerActorIdToSession.end()) {
326+
LOG_W("Unknown worker has gone"
327+
<< ": worker# " << ev->Sender);
328+
return;
329+
}
330+
331+
auto it = Sessions.find(wit->second);
332+
if (it == Sessions.end()) {
333+
LOG_E("Cannot find session"
334+
<< ": worker# " << ev->Sender
335+
<< ", session# " << wit->second);
336+
return;
337+
}
338+
339+
auto& session = it->second;
340+
if (!session.HasWorker(ev->Sender)) {
341+
LOG_E("Cannot find worker"
342+
<< ": worker# " << ev->Sender
343+
<< ", session# " << wit->second);
344+
return;
345+
}
346+
347+
LOG_I("Worker has gone"
348+
<< ": worker# " << ev->Sender);
349+
WorkerActorIdToSession.erase(ev->Sender);
350+
session.StopWorker(this, ev->Sender);
351+
}
352+
291353
void PassAway() override {
292354
if (auto actorId = std::exchange(BoardPublisher, {})) {
293355
Send(actorId, new TEvents::TEvPoison());
@@ -319,6 +381,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
319381
hFunc(TEvService::TEvHandshake, Handle);
320382
hFunc(TEvService::TEvRunWorker, Handle);
321383
hFunc(TEvService::TEvStopWorker, Handle);
384+
hFunc(TEvWorker::TEvGone, Handle);
322385
sFunc(TEvents::TEvPoison, PassAway);
323386
}
324387
}
@@ -328,6 +391,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
328391
TActorId BoardPublisher;
329392
THashMap<ui64, TSessionInfo> Sessions;
330393
THashMap<TCredentialsKey, TActorId> YdbProxies;
394+
THashMap<TActorId, ui64> WorkerActorIdToSession;
331395

332396
}; // TReplicationService
333397

ydb/core/tx/replication/service/table_writer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
139139
}
140140

141141
void Leave(bool hardError = false) {
142+
LOG_I("Leave"
143+
<< ": hard error# " << hardError);
144+
142145
Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(TabletId, hardError));
143146
PassAway();
144147
}
@@ -495,6 +498,9 @@ class TLocalTableWriter
495498
}
496499

497500
void Leave(TEvWorker::TEvGone::EStatus status) {
501+
LOG_I("Leave"
502+
<< ": status# " << status);
503+
498504
Send(Worker, new TEvWorker::TEvGone(status));
499505
PassAway();
500506
}

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
7676
}
7777

7878
void Leave(TEvWorker::TEvGone::EStatus status) {
79-
LOG_I("Leave");
79+
LOG_I("Leave"
80+
<< ": status# " << status);
81+
8082
Send(Worker, new TEvWorker::TEvGone(status));
8183
PassAway();
8284
}

ydb/core/tx/replication/service/worker.cpp

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,15 @@ class TWorker: public TActorBootstrapped<TWorker> {
191191
}
192192
[[fallthrough]];
193193
default:
194-
return Leave();
194+
return Leave(status);
195195
}
196196
}
197197

198-
void Leave() {
199-
// TODO: signal to parent
198+
void Leave(TEvWorker::TEvGone::EStatus status) {
199+
LOG_I("Leave"
200+
<< ": status# " << status);
201+
202+
Send(Parent, new TEvWorker::TEvGone(status));
200203
PassAway();
201204
}
202205

@@ -213,8 +216,12 @@ class TWorker: public TActorBootstrapped<TWorker> {
213216
return NKikimrServices::TActivity::REPLICATION_WORKER;
214217
}
215218

216-
explicit TWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn)
217-
: Reader(std::move(createReaderFn))
219+
explicit TWorker(
220+
const TActorId& parent,
221+
std::function<IActor*(void)>&& createReaderFn,
222+
std::function<IActor*(void)>&& createWriterFn)
223+
: Parent(parent)
224+
, Reader(std::move(createReaderFn))
218225
, Writer(std::move(createWriterFn))
219226
{
220227
}
@@ -239,14 +246,19 @@ class TWorker: public TActorBootstrapped<TWorker> {
239246

240247
private:
241248
static constexpr ui32 MaxAttempts = 3;
249+
const TActorId Parent;
242250
mutable TMaybe<TString> LogPrefix;
243251
TActorInfo Reader;
244252
TActorInfo Writer;
245253
THolder<TEvWorker::TEvData> InFlightData;
246254
};
247255

248-
IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn) {
249-
return new TWorker(std::move(createReaderFn), std::move(createWriterFn));
256+
IActor* CreateWorker(
257+
const TActorId& parent,
258+
std::function<IActor*(void)>&& createReaderFn,
259+
std::function<IActor*(void)>&& createWriterFn)
260+
{
261+
return new TWorker(parent, std::move(createReaderFn), std::move(createWriterFn));
250262
}
251263

252264
}

ydb/core/tx/replication/service/worker.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ struct TEvWorker {
5959
};
6060
};
6161

62-
IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn);
62+
IActor* CreateWorker(
63+
const TActorId& parent,
64+
std::function<IActor*(void)>&& createReaderFn,
65+
std::function<IActor*(void)>&& createWriterFn);
6366

6467
}
6568

ydb/core/tx/replication/service/worker_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(Worker) {
5656
return CreateLocalTableWriter(tablePathId);
5757
};
5858

59-
auto worker = env.GetRuntime().Register(CreateWorker(std::move(createReaderFn), std::move(createWriterFn)));
59+
auto worker = env.GetRuntime().Register(CreateWorker(env.GetSender(), std::move(createReaderFn), std::move(createWriterFn)));
6060
Y_UNUSED(worker);
6161

6262
UNIT_ASSERT(WriteTopic(env, "/Root/topic", R"({"key":[1], "update":{"value":"10"}})"));

0 commit comments

Comments
 (0)