Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/counters_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ enum ETxTypes {
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
}
14 changes: 11 additions & 3 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,19 @@ message TEvStopWorker {

message TEvWorkerStatus {
enum EStatus {
UNKNOWN = 0;
RUNNING = 1;
STOPPED = 2;
STATUS_UNSPECIFIED = 0;
STATUS_RUNNING = 1;
STATUS_STOPPED = 2;
}

enum EReason {
REASON_UNSPECIFIED = 0;
REASON_ACK = 1;
REASON_ERROR = 2;
}

optional TWorkerIdentity Worker = 1;
optional EStatus Status = 2;
optional EReason Reason = 3;
optional string ErrorDescription = 4;
}
22 changes: 13 additions & 9 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,23 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
const auto id = TWorkerId::Parse(record.GetWorker());

switch (record.GetStatus()) {
case NKikimrReplication::TEvWorkerStatus::RUNNING:
case NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING:
if (!session.HasWorker(id)) {
StopQueue.emplace(id, nodeId);
}
break;
case NKikimrReplication::TEvWorkerStatus::STOPPED:
case NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED:
if (!MaybeRemoveWorker(id, ctx)) {
session.DetachWorker(id);
if (IsValidWorker(id)) {
auto* worker = GetOrCreateWorker(id);
worker->ClearSession();
if (worker->HasCommand()) {
BootQueue.insert(id);
if (record.GetReason() == NKikimrReplication::TEvWorkerStatus::REASON_ERROR) {
RunTxWorkerError(id, record.GetErrorDescription(), ctx);
} else {
session.DetachWorker(id);
if (IsValidWorker(id)) {
auto* worker = GetOrCreateWorker(id);
worker->ClearSession();
if (worker->HasCommand()) {
BootQueue.insert(id);
}
}
}
}
Expand Down Expand Up @@ -577,7 +581,7 @@ void TController::Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorCont

void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
LOG_D("Remove worker"
<< ", workerId# " << id);
<< ": workerId# " << id);

Y_ABORT_UNLESS(RemoveQueue.contains(id));

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TController
class TTxAlterDstResult;
class TTxDropDstResult;
class TTxResolveSecretResult;
class TTxWorkerError;

// tx runners
void RunTxInitSchema(const TActorContext& ctx);
Expand All @@ -134,6 +135,7 @@ class TController
void RunTxAlterDstResult(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
void RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx);

// other
template <typename T>
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/tx/replication/controller/tx_worker_error.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include "controller_impl.h"

namespace NKikimr::NReplication::NController {

class TController::TTxWorkerError: public TTxBase {
const TWorkerId WorkerId;
const TString Error;

public:
explicit TTxWorkerError(TController* self, const TWorkerId& id, const TString& error)
: TTxBase("TxWorkerError", self)
, WorkerId(id)
, Error(error)
{
}

TTxType GetTxType() const override {
return TXTYPE_WORKER_ERROR;
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
CLOG_D(ctx, "Execute"
<< ": workerId# " << WorkerId
<< ", error# " << Error);

auto replication = Self->Find(WorkerId.ReplicationId());
if (!replication) {
CLOG_W(ctx, "Unknown replication"
<< ": rid# " << WorkerId.ReplicationId());
return true;
}

auto* target = replication->FindTarget(WorkerId.TargetId());
if (!target) {
CLOG_W(ctx, "Unknown target"
<< ": rid# " << WorkerId.ReplicationId()
<< ", tid# " << WorkerId.TargetId());
return true;
}

CLOG_E(ctx, "Worker error"
<< ": rid# " << WorkerId.ReplicationId()
<< ", tid# " << WorkerId.TargetId()
<< ", error# " << Error);

replication->SetState(TReplication::EState::Error);
target->SetDstState(TReplication::EDstState::Error);
target->SetIssue(Error);

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(WorkerId.ReplicationId()).Update(
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState())
);
db.Table<Schema::Targets>().Key(WorkerId.ReplicationId(), WorkerId.TargetId()).Update(
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
NIceDb::TUpdate<Schema::Targets::Issue>(target->GetIssue())
);

return true;
}

void Complete(const TActorContext& ctx) override {
CLOG_D(ctx, "Complete");
}

}; // TTxWorkerError

void TController::RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx) {
Execute(new TTxWorkerError(this, id, error), ctx);
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ SRCS(
tx_init.cpp
tx_init_schema.cpp
tx_resolve_secret_result.cpp
tx_worker_error.cpp
)

GENERATE_ENUM_SERIALIZATION(replication.h)
Expand Down
29 changes: 20 additions & 9 deletions ydb/core/tx/replication/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TSessionInfo {
const auto actorId = res.first->second;
ActorIdToWorkerId.emplace(actorId, id);

SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING);
return actorId;
}

Expand All @@ -71,25 +71,27 @@ class TSessionInfo {
Y_ABORT_UNLESS(it != Workers.end());

ops->Send(it->second, new TEvents::TEvPoison());
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED);

ActorIdToWorkerId.erase(it->second);
Workers.erase(it);
}

void StopWorker(IActorOps* ops, const TActorId& id) {
template <typename... Args>
void StopWorker(IActorOps* ops, const TActorId& id, Args&&... args) {
auto it = ActorIdToWorkerId.find(id);
Y_ABORT_UNLESS(it != ActorIdToWorkerId.end());

// actor already stopped
SendWorkerStatus(ops, it->second, NKikimrReplication::TEvWorkerStatus::STOPPED);
SendWorkerStatus(ops, it->second, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED, std::forward<Args>(args)...);

Workers.erase(it->second);
ActorIdToWorkerId.erase(it);
}

void SendWorkerStatus(IActorOps* ops, const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) {
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, status));
template <typename... Args>
void SendWorkerStatus(IActorOps* ops, const TWorkerId& id, Args&&... args) {
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, std::forward<Args>(args)...));
}

void SendStatus(IActorOps* ops) const {
Expand Down Expand Up @@ -273,7 +275,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
}

if (session.HasWorker(id)) {
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING);
}

LOG_I("Run worker"
Expand Down Expand Up @@ -315,7 +317,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
}

if (!session.HasWorker(id)) {
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED);
}

LOG_I("Stop worker"
Expand Down Expand Up @@ -353,7 +355,16 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
LOG_I("Worker has gone"
<< ": worker# " << ev->Sender);
WorkerActorIdToSession.erase(ev->Sender);
session.StopWorker(this, ev->Sender);
session.StopWorker(this, ev->Sender, ToReason(ev->Get()->Status), ev->Get()->ErrorDescription);
}

static NKikimrReplication::TEvWorkerStatus::EReason ToReason(TEvWorker::TEvGone::EStatus status) {
switch (status) {
case TEvWorker::TEvGone::SCHEME_ERROR:
return NKikimrReplication::TEvWorkerStatus::REASON_ERROR;
default:
return NKikimrReplication::TEvWorkerStatus::REASON_UNSPECIFIED;
}
}

void PassAway() override {
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/replication/service/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ struct TEvService {
explicit TEvWorkerStatus(const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) {
id.Serialize(*Record.MutableWorker());
Record.SetStatus(status);
Record.SetReason(NKikimrReplication::TEvWorkerStatus::REASON_ACK);
}

explicit TEvWorkerStatus(const TWorkerId& id,
NKikimrReplication::TEvWorkerStatus::EStatus status,
NKikimrReplication::TEvWorkerStatus::EReason reason,
const TString& errorDescription
) {
id.Serialize(*Record.MutableWorker());
Record.SetStatus(status);
Record.SetReason(reason);
Record.SetErrorDescription(errorDescription);
}
};
};
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class TLocalTableWriter

void LogCritAndLeave(const TString& error) {
LOG_C(error);
Leave(TEvWorker::TEvGone::SCHEME_ERROR);
Leave(TEvWorker::TEvGone::SCHEME_ERROR, error);
}

void LogWarnAndRetry(const TString& error) {
Expand Down Expand Up @@ -487,7 +487,7 @@ class TLocalTableWriter
LOG_D("Handle " << ev->Get()->ToString());

if (ev->Get()->HardError) {
Leave(TEvWorker::TEvGone::SCHEME_ERROR);
Leave(TEvWorker::TEvGone::SCHEME_ERROR, "Cannot apply changes");
} else {
OnGone(ev->Get()->PartitionId);
}
Expand All @@ -497,11 +497,11 @@ class TLocalTableWriter
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
}

void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave"
<< ": status# " << status);
template <typename... Args>
void Leave(Args&&... args) {
LOG_I("Leave");

Send(Worker, new TEvWorker::TEvGone(status));
Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
PassAway();
}

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {

switch (ev->Get()->Result.GetStatus()) {
case NYdb::EStatus::SCHEME_ERROR:
return Leave(TEvWorker::TEvGone::SCHEME_ERROR);
return Leave(TEvWorker::TEvGone::SCHEME_ERROR, ev->Get()->Result.GetIssues().ToOneLineString());
default:
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
}
}

void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave"
<< ": status# " << status);
template <typename... Args>
void Leave(Args&&... args) {
LOG_I("Leave");

Send(Worker, new TEvWorker::TEvGone(status));
Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
PassAway();
}

Expand Down
23 changes: 14 additions & 9 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ TString TEvWorker::TEvData::ToString() const {
<< " }";
}

TEvWorker::TEvGone::TEvGone(EStatus status)
TEvWorker::TEvGone::TEvGone(EStatus status, const TString& errorDescription)
: Status(status)
, ErrorDescription(errorDescription)
{
}

TString TEvWorker::TEvGone::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Status: " << Status
<< " ErrorDescription: " << ErrorDescription
<< " }";
}

Expand Down Expand Up @@ -172,34 +174,37 @@ class TWorker: public TActorBootstrapped<TWorker> {
if (ev->Sender == Reader) {
LOG_I("Reader has gone"
<< ": sender# " << ev->Sender);
MaybeRecreateActor(ev->Get()->Status, Reader);
MaybeRecreateActor(ev, Reader);
} else if (ev->Sender == Writer) {
LOG_I("Writer has gone"
<< ": sender# " << ev->Sender);
MaybeRecreateActor(ev->Get()->Status, Writer);
MaybeRecreateActor(ev, Writer);
} else {
LOG_W("Unknown actor has gone"
<< ": sender# " << ev->Sender);
}
}

void MaybeRecreateActor(TEvWorker::TEvGone::EStatus status, TActorInfo& info) {
switch (status) {
void MaybeRecreateActor(TEvWorker::TEvGone::TPtr& ev, TActorInfo& info) {
switch (ev->Get()->Status) {
case TEvWorker::TEvGone::UNAVAILABLE:
if (info.GetCreateAttempt() < MaxAttempts) {
return info.Register(this);
}
[[fallthrough]];
default:
return Leave(status);
return Leave(ev);
}
}

void Leave(TEvWorker::TEvGone::EStatus status) {
void Leave(TEvWorker::TEvGone::TPtr& ev) {
LOG_I("Leave"
<< ": status# " << status);
<< ": status# " << ev->Get()->Status
<< ", error# " << ev->Get()->ErrorDescription);

ev->Sender = SelfId();
Send(ev->Forward(Parent));

Send(Parent, new TEvWorker::TEvGone(status));
PassAway();
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ struct TEvWorker {
};

EStatus Status;
TString ErrorDescription;

explicit TEvGone(EStatus status);
explicit TEvGone(EStatus status, const TString& errorDescription = {});
TString ToString() const override;
};
};
Expand Down