Skip to content

Commit 2bd89b3

Browse files
committed
Handle worker error
1 parent e41e3c9 commit 2bd89b3

File tree

12 files changed

+158
-42
lines changed

12 files changed

+158
-42
lines changed

ydb/core/protos/counters_replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ enum ETxTypes {
3737
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
3838
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
3939
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
40+
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
4041
}

ydb/core/protos/replication.proto

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,19 @@ message TEvStopWorker {
213213

214214
message TEvWorkerStatus {
215215
enum EStatus {
216-
UNKNOWN = 0;
217-
RUNNING = 1;
218-
STOPPED = 2;
216+
STATUS_UNSPECIFIED = 0;
217+
STATUS_RUNNING = 1;
218+
STATUS_STOPPED = 2;
219+
}
220+
221+
enum EReason {
222+
REASON_UNSPECIFIED = 0;
223+
REASON_ACK = 1;
224+
REASON_ERROR = 2;
219225
}
220226

221227
optional TWorkerIdentity Worker = 1;
222228
optional EStatus Status = 2;
229+
optional EReason Reason = 3;
230+
optional string ErrorDescription = 4;
223231
}

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,19 +338,23 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
338338
const auto id = TWorkerId::Parse(record.GetWorker());
339339

340340
switch (record.GetStatus()) {
341-
case NKikimrReplication::TEvWorkerStatus::RUNNING:
341+
case NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING:
342342
if (!session.HasWorker(id)) {
343343
StopQueue.emplace(id, nodeId);
344344
}
345345
break;
346-
case NKikimrReplication::TEvWorkerStatus::STOPPED:
346+
case NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED:
347347
if (!MaybeRemoveWorker(id, ctx)) {
348-
session.DetachWorker(id);
349-
if (IsValidWorker(id)) {
350-
auto* worker = GetOrCreateWorker(id);
351-
worker->ClearSession();
352-
if (worker->HasCommand()) {
353-
BootQueue.insert(id);
348+
if (record.GetReason() == NKikimrReplication::TEvWorkerStatus::REASON_ERROR) {
349+
RunTxWorkerError(id, record.GetErrorDescription(), ctx);
350+
} else {
351+
session.DetachWorker(id);
352+
if (IsValidWorker(id)) {
353+
auto* worker = GetOrCreateWorker(id);
354+
worker->ClearSession();
355+
if (worker->HasCommand()) {
356+
BootQueue.insert(id);
357+
}
354358
}
355359
}
356360
}
@@ -577,7 +581,7 @@ void TController::Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorCont
577581

578582
void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
579583
LOG_D("Remove worker"
580-
<< ", workerId# " << id);
584+
<< ": workerId# " << id);
581585

582586
Y_ABORT_UNLESS(RemoveQueue.contains(id));
583587

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TController
117117
class TTxAlterDstResult;
118118
class TTxDropDstResult;
119119
class TTxResolveSecretResult;
120+
class TTxWorkerError;
120121

121122
// tx runners
122123
void RunTxInitSchema(const TActorContext& ctx);
@@ -134,6 +135,7 @@ class TController
134135
void RunTxAlterDstResult(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
135136
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
136137
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
138+
void RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx);
137139

138140
// other
139141
template <typename T>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include "controller_impl.h"
2+
3+
namespace NKikimr::NReplication::NController {
4+
5+
class TController::TTxWorkerError: public TTxBase {
6+
const TWorkerId WorkerId;
7+
const TString Error;
8+
9+
public:
10+
explicit TTxWorkerError(TController* self, const TWorkerId& id, const TString& error)
11+
: TTxBase("TxWorkerError", self)
12+
, WorkerId(id)
13+
, Error(error)
14+
{
15+
}
16+
17+
TTxType GetTxType() const override {
18+
return TXTYPE_WORKER_ERROR;
19+
}
20+
21+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
22+
CLOG_D(ctx, "Execute"
23+
<< ": workerId# " << WorkerId
24+
<< ", error# " << Error);
25+
26+
auto replication = Self->Find(WorkerId.ReplicationId());
27+
if (!replication) {
28+
CLOG_W(ctx, "Unknown replication"
29+
<< ": rid# " << WorkerId.ReplicationId());
30+
return true;
31+
}
32+
33+
auto* target = replication->FindTarget(WorkerId.TargetId());
34+
if (!target) {
35+
CLOG_W(ctx, "Unknown target"
36+
<< ": rid# " << WorkerId.ReplicationId()
37+
<< ", tid# " << WorkerId.TargetId());
38+
return true;
39+
}
40+
41+
CLOG_E(ctx, "Worker error"
42+
<< ": rid# " << WorkerId.ReplicationId()
43+
<< ", tid# " << WorkerId.TargetId()
44+
<< ", error# " << Error);
45+
46+
replication->SetState(TReplication::EState::Error);
47+
target->SetDstState(TReplication::EDstState::Error);
48+
target->SetIssue(Error);
49+
50+
NIceDb::TNiceDb db(txc.DB);
51+
db.Table<Schema::Replications>().Key(WorkerId.ReplicationId()).Update(
52+
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState())
53+
);
54+
db.Table<Schema::Targets>().Key(WorkerId.ReplicationId(), WorkerId.TargetId()).Update(
55+
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
56+
NIceDb::TUpdate<Schema::Targets::Issue>(target->GetIssue())
57+
);
58+
59+
return true;
60+
}
61+
62+
void Complete(const TActorContext& ctx) override {
63+
CLOG_D(ctx, "Complete");
64+
}
65+
66+
}; // TTxWorkerError
67+
68+
void TController::RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx) {
69+
Execute(new TTxWorkerError(this, id, error), ctx);
70+
}
71+
72+
}

ydb/core/tx/replication/controller/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ SRCS(
4747
tx_init.cpp
4848
tx_init_schema.cpp
4949
tx_resolve_secret_result.cpp
50+
tx_worker_error.cpp
5051
)
5152

5253
GENERATE_ENUM_SERIALIZATION(replication.h)

ydb/core/tx/replication/service/service.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class TSessionInfo {
6262
const auto actorId = res.first->second;
6363
ActorIdToWorkerId.emplace(actorId, id);
6464

65-
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
65+
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING);
6666
return actorId;
6767
}
6868

@@ -71,25 +71,27 @@ class TSessionInfo {
7171
Y_ABORT_UNLESS(it != Workers.end());
7272

7373
ops->Send(it->second, new TEvents::TEvPoison());
74-
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
74+
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED);
7575

7676
ActorIdToWorkerId.erase(it->second);
7777
Workers.erase(it);
7878
}
7979

80-
void StopWorker(IActorOps* ops, const TActorId& id) {
80+
template <typename... Args>
81+
void StopWorker(IActorOps* ops, const TActorId& id, Args&&... args) {
8182
auto it = ActorIdToWorkerId.find(id);
8283
Y_ABORT_UNLESS(it != ActorIdToWorkerId.end());
8384

8485
// actor already stopped
85-
SendWorkerStatus(ops, it->second, NKikimrReplication::TEvWorkerStatus::STOPPED);
86+
SendWorkerStatus(ops, it->second, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED, std::forward<Args>(args)...);
8687

8788
Workers.erase(it->second);
8889
ActorIdToWorkerId.erase(it);
8990
}
9091

91-
void SendWorkerStatus(IActorOps* ops, const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) {
92-
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, status));
92+
template <typename... Args>
93+
void SendWorkerStatus(IActorOps* ops, const TWorkerId& id, Args&&... args) {
94+
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, std::forward<Args>(args)...));
9395
}
9496

9597
void SendStatus(IActorOps* ops) const {
@@ -273,7 +275,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
273275
}
274276

275277
if (session.HasWorker(id)) {
276-
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
278+
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING);
277279
}
278280

279281
LOG_I("Run worker"
@@ -315,7 +317,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
315317
}
316318

317319
if (!session.HasWorker(id)) {
318-
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
320+
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED);
319321
}
320322

321323
LOG_I("Stop worker"
@@ -353,7 +355,16 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
353355
LOG_I("Worker has gone"
354356
<< ": worker# " << ev->Sender);
355357
WorkerActorIdToSession.erase(ev->Sender);
356-
session.StopWorker(this, ev->Sender);
358+
session.StopWorker(this, ev->Sender, ToReason(ev->Get()->Status), ev->Get()->ErrorDescription);
359+
}
360+
361+
static NKikimrReplication::TEvWorkerStatus::EReason ToReason(TEvWorker::TEvGone::EStatus status) {
362+
switch (status) {
363+
case TEvWorker::TEvGone::SCHEME_ERROR:
364+
return NKikimrReplication::TEvWorkerStatus::REASON_ERROR;
365+
default:
366+
return NKikimrReplication::TEvWorkerStatus::REASON_UNSPECIFIED;
367+
}
357368
}
358369

359370
void PassAway() override {

ydb/core/tx/replication/service/service.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ struct TEvService {
5151
id.Serialize(*Record.MutableWorker());
5252
Record.SetStatus(status);
5353
}
54+
55+
explicit TEvWorkerStatus(const TWorkerId& id,
56+
NKikimrReplication::TEvWorkerStatus::EStatus status,
57+
NKikimrReplication::TEvWorkerStatus::EReason reason,
58+
const TString& errorDescription
59+
) {
60+
id.Serialize(*Record.MutableWorker());
61+
Record.SetStatus(status);
62+
Record.SetReason(reason);
63+
Record.SetErrorDescription(errorDescription);
64+
}
5465
};
5566
};
5667

ydb/core/tx/replication/service/table_writer.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class TLocalTableWriter
217217

218218
void LogCritAndLeave(const TString& error) {
219219
LOG_C(error);
220-
Leave(TEvWorker::TEvGone::SCHEME_ERROR);
220+
Leave(TEvWorker::TEvGone::SCHEME_ERROR, error);
221221
}
222222

223223
void LogWarnAndRetry(const TString& error) {
@@ -487,7 +487,7 @@ class TLocalTableWriter
487487
LOG_D("Handle " << ev->Get()->ToString());
488488

489489
if (ev->Get()->HardError) {
490-
Leave(TEvWorker::TEvGone::SCHEME_ERROR);
490+
Leave(TEvWorker::TEvGone::SCHEME_ERROR, "Cannot apply changes");
491491
} else {
492492
OnGone(ev->Get()->PartitionId);
493493
}
@@ -497,11 +497,11 @@ class TLocalTableWriter
497497
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
498498
}
499499

500-
void Leave(TEvWorker::TEvGone::EStatus status) {
501-
LOG_I("Leave"
502-
<< ": status# " << status);
500+
template <typename... Args>
501+
void Leave(Args&&... args) {
502+
LOG_I("Leave");
503503

504-
Send(Worker, new TEvWorker::TEvGone(status));
504+
Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
505505
PassAway();
506506
}
507507

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,17 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
6969

7070
switch (ev->Get()->Result.GetStatus()) {
7171
case NYdb::EStatus::SCHEME_ERROR:
72-
return Leave(TEvWorker::TEvGone::SCHEME_ERROR);
72+
return Leave(TEvWorker::TEvGone::SCHEME_ERROR, ev->Get()->Result.GetIssues().ToOneLineString());
7373
default:
7474
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
7575
}
7676
}
7777

78-
void Leave(TEvWorker::TEvGone::EStatus status) {
79-
LOG_I("Leave"
80-
<< ": status# " << status);
78+
template <typename... Args>
79+
void Leave(Args&&... args) {
80+
LOG_I("Leave");
8181

82-
Send(Worker, new TEvWorker::TEvGone(status));
82+
Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
8383
PassAway();
8484
}
8585

0 commit comments

Comments
 (0)