Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
return;
}

const auto& session = Sessions[nodeId];
auto& session = Sessions[nodeId];
const auto& record = ev->Get()->Record;
const auto id = TWorkerId::Parse(record.GetWorker());

Expand All @@ -344,7 +344,16 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
}
break;
case NKikimrReplication::TEvWorkerStatus::STOPPED:
MaybeRemoveWorker(id, ctx);
if (!MaybeRemoveWorker(id, ctx)) {
session.DetachWorker(id);
if (IsValidWorker(id)) {
auto* worker = GetOrCreateWorker(id);
worker->ClearSession();
if (worker->HasCommand()) {
BootQueue.insert(id);
}
}
}
break;
default:
CLOG_W(ctx, "Unknown worker status"
Expand Down Expand Up @@ -589,10 +598,13 @@ void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
target->Progress(ctx);
}

void TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
if (RemoveQueue.contains(id)) {
RemoveWorker(id, ctx);
bool TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
if (!RemoveQueue.contains(id)) {
return false;
}

RemoveWorker(id, ctx);
return true;
}

void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TController
void BootWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd);
void StopWorker(ui32 nodeId, const TWorkerId& id);
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
void MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);

// local transactions
class TTxInitSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ class TController::TTxResolveSecretResult: public TTxBase {
return true;
}

if (Replication->GetState() != TReplication::EState::Ready) {
CLOG_W(ctx, "Replication state mismatch"
<< ": rid# " << rid
<< ", state# " << Replication->GetState());
return true;
}

if (Ev->Get()->IsSuccess()) {
CLOG_N(ctx, "Secret resolved"
<< ": rid# " << rid);
Expand Down
88 changes: 79 additions & 9 deletions ydb/core/tx/replication/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,51 @@ class TSessionInfo {
return Workers.contains(id);
}

void RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) {
bool HasWorker(const TActorId& id) const {
return ActorIdToWorkerId.contains(id);
}

TActorId GetWorkerActorId(const TWorkerId& id) const {
auto it = Workers.find(id);
Y_ABORT_UNLESS(it != Workers.end());
return it->second;
}

TActorId RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) {
auto res = Workers.emplace(id, ops->Register(actor));
Y_ABORT_UNLESS(res.second);

ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::RUNNING));
const auto actorId = res.first->second;
ActorIdToWorkerId.emplace(actorId, id);

SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
return actorId;
}

void StopWorker(IActorOps* ops, const TWorkerId& id) {
auto it = Workers.find(id);
Y_ABORT_UNLESS(it != Workers.end());

ops->Send(it->second, new TEvents::TEvPoison());
SendWorkerStatus(ops, id, NKikimrReplication::TEvWorkerStatus::STOPPED);

ActorIdToWorkerId.erase(it->second);
Workers.erase(it);
}

void StopWorker(IActorOps* ops, const TActorId& id) {
auto it = ActorIdToWorkerId.find(id);
Y_ABORT_UNLESS(it != ActorIdToWorkerId.end());

// actor already stopped
SendWorkerStatus(ops, it->second, NKikimrReplication::TEvWorkerStatus::STOPPED);

Workers.erase(it->second);
ActorIdToWorkerId.erase(it);
}

ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::STOPPED));
void SendWorkerStatus(IActorOps* ops, const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) {
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, status));
}

void SendStatus(IActorOps* ops) const {
Expand All @@ -83,6 +113,7 @@ class TSessionInfo {
TActorId ActorId;
ui64 Generation;
THashMap<TWorkerId, TActorId> Workers;
THashMap<TActorId, TWorkerId> ActorIdToWorkerId;

}; // TSessionInfo

Expand Down Expand Up @@ -242,7 +273,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
}

if (session.HasWorker(id)) {
return;
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::RUNNING);
}

LOG_I("Run worker"
Expand All @@ -252,7 +283,9 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
// TODO: validate settings
const auto& readerSettings = cmd.GetRemoteTopicReader();
const auto& writerSettings = cmd.GetLocalTableWriter();
session.RegisterWorker(this, id, CreateWorker(ReaderFn(readerSettings), WriterFn(writerSettings)));
const auto actorId = session.RegisterWorker(this, id,
CreateWorker(SelfId(), ReaderFn(readerSettings), WriterFn(writerSettings)));
WorkerActorIdToSession[actorId] = controller.GetTabletId();
}

void Handle(TEvService::TEvStopWorker::TPtr& ev) {
Expand Down Expand Up @@ -281,11 +314,46 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
return;
}

if (session.HasWorker(id)) {
LOG_I("Stop worker"
<< ": worker# " << id);
session.StopWorker(this, id);
if (!session.HasWorker(id)) {
return session.SendWorkerStatus(this, id, NKikimrReplication::TEvWorkerStatus::STOPPED);
}

LOG_I("Stop worker"
<< ": worker# " << id);
WorkerActorIdToSession.erase(session.GetWorkerActorId(id));
session.StopWorker(this, id);
}

void Handle(TEvWorker::TEvGone::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());

auto wit = WorkerActorIdToSession.find(ev->Sender);
if (wit == WorkerActorIdToSession.end()) {
LOG_W("Unknown worker has gone"
<< ": worker# " << ev->Sender);
return;
}

auto it = Sessions.find(wit->second);
if (it == Sessions.end()) {
LOG_E("Cannot find session"
<< ": worker# " << ev->Sender
<< ", session# " << wit->second);
return;
}

auto& session = it->second;
if (!session.HasWorker(ev->Sender)) {
LOG_E("Cannot find worker"
<< ": worker# " << ev->Sender
<< ", session# " << wit->second);
return;
}

LOG_I("Worker has gone"
<< ": worker# " << ev->Sender);
WorkerActorIdToSession.erase(ev->Sender);
session.StopWorker(this, ev->Sender);
}

void PassAway() override {
Expand Down Expand Up @@ -319,6 +387,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
hFunc(TEvService::TEvHandshake, Handle);
hFunc(TEvService::TEvRunWorker, Handle);
hFunc(TEvService::TEvStopWorker, Handle);
hFunc(TEvWorker::TEvGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
Expand All @@ -328,6 +397,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
TActorId BoardPublisher;
THashMap<ui64, TSessionInfo> Sessions;
THashMap<TCredentialsKey, TActorId> YdbProxies;
THashMap<TActorId, ui64> WorkerActorIdToSession;

}; // TReplicationService

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
}

void Leave(bool hardError = false) {
LOG_I("Leave"
<< ": hard error# " << hardError);

Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(TabletId, hardError));
PassAway();
}
Expand Down Expand Up @@ -495,6 +498,9 @@ class TLocalTableWriter
}

void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave"
<< ": status# " << status);

Send(Worker, new TEvWorker::TEvGone(status));
PassAway();
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
}

void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave");
LOG_I("Leave"
<< ": status# " << status);

Send(Worker, new TEvWorker::TEvGone(status));
PassAway();
}
Expand Down
26 changes: 19 additions & 7 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,15 @@ class TWorker: public TActorBootstrapped<TWorker> {
}
[[fallthrough]];
default:
return Leave();
return Leave(status);
}
}

void Leave() {
// TODO: signal to parent
void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave"
<< ": status# " << status);

Send(Parent, new TEvWorker::TEvGone(status));
PassAway();
}

Expand All @@ -213,8 +216,12 @@ class TWorker: public TActorBootstrapped<TWorker> {
return NKikimrServices::TActivity::REPLICATION_WORKER;
}

explicit TWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn)
: Reader(std::move(createReaderFn))
explicit TWorker(
const TActorId& parent,
std::function<IActor*(void)>&& createReaderFn,
std::function<IActor*(void)>&& createWriterFn)
: Parent(parent)
, Reader(std::move(createReaderFn))
, Writer(std::move(createWriterFn))
{
}
Expand All @@ -239,14 +246,19 @@ class TWorker: public TActorBootstrapped<TWorker> {

private:
static constexpr ui32 MaxAttempts = 3;
const TActorId Parent;
mutable TMaybe<TString> LogPrefix;
TActorInfo Reader;
TActorInfo Writer;
THolder<TEvWorker::TEvData> InFlightData;
};

IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn) {
return new TWorker(std::move(createReaderFn), std::move(createWriterFn));
IActor* CreateWorker(
const TActorId& parent,
std::function<IActor*(void)>&& createReaderFn,
std::function<IActor*(void)>&& createWriterFn)
{
return new TWorker(parent, std::move(createReaderFn), std::move(createWriterFn));
}

}
5 changes: 4 additions & 1 deletion ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ struct TEvWorker {
};
};

IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn);
IActor* CreateWorker(
const TActorId& parent,
std::function<IActor*(void)>&& createReaderFn,
std::function<IActor*(void)>&& createWriterFn);

}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/service/worker_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(Worker) {
return CreateLocalTableWriter(tablePathId);
};

auto worker = env.GetRuntime().Register(CreateWorker(std::move(createReaderFn), std::move(createWriterFn)));
auto worker = env.GetRuntime().Register(CreateWorker(env.GetSender(), std::move(createReaderFn), std::move(createWriterFn)));
Y_UNUSED(worker);

UNIT_ASSERT(WriteTopic(env, "/Root/topic", R"({"key":[1], "update":{"value":"10"}})"));
Expand Down