Skip to content

Commit ed7cb89

Browse files
committed
Switch replication to 'Done' state
1 parent bd36f9e commit ed7cb89

File tree

10 files changed

+89
-44
lines changed

10 files changed

+89
-44
lines changed

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <ydb/library/yverify_stream/yverify_stream.h>
1313

1414
#include <util/generic/hash.h>
15+
#include <util/generic/hash_set.h>
1516
#include <util/generic/ptr.h>
1617

1718
namespace NKikimr::NReplication::NController {
@@ -28,10 +29,10 @@ class TReplication::TImpl {
2829
}
2930

3031
template <typename... Args>
31-
ITarget* CreateTarget(ui64 id, ETargetKind kind, Args&&... args) const {
32+
ITarget* CreateTarget(TReplication::TPtr self, ui64 id, ETargetKind kind, Args&&... args) const {
3233
switch (kind) {
3334
case ETargetKind::Table:
34-
return new TTableTarget(id, std::forward<Args>(args)...);
35+
return new TTableTarget(self, id, std::forward<Args>(args)...);
3536
}
3637
}
3738

@@ -59,9 +60,9 @@ class TReplication::TImpl {
5960
}
6061
}
6162

62-
void ProgressTargets(TReplication::TPtr self, const TActorContext& ctx) {
63+
void ProgressTargets(const TActorContext& ctx) {
6364
for (auto& [_, target] : Targets) {
64-
target->Progress(self, ctx);
65+
target->Progress(ctx);
6566
}
6667
}
6768

@@ -75,15 +76,15 @@ class TReplication::TImpl {
7576
}
7677

7778
template <typename... Args>
78-
ui64 AddTarget(ui64 id, ETargetKind kind, Args&&... args) {
79-
const auto res = Targets.emplace(id, CreateTarget(id, kind, std::forward<Args>(args)...));
79+
ui64 AddTarget(TReplication::TPtr self, ui64 id, ETargetKind kind, Args&&... args) {
80+
const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward<Args>(args)...));
8081
Y_VERIFY_S(res.second, "Duplicate target: " << id);
8182
return id;
8283
}
8384

8485
template <typename... Args>
85-
ui64 AddTarget(ETargetKind kind, Args&&... args) {
86-
return AddTarget(NextTargetId++, kind, std::forward<Args>(args)...);
86+
ui64 AddTarget(TReplication::TPtr self, ETargetKind kind, Args&&... args) {
87+
return AddTarget(self, NextTargetId++, kind, std::forward<Args>(args)...);
8788
}
8889

8990
ITarget* FindTarget(ui64 id) {
@@ -97,7 +98,7 @@ class TReplication::TImpl {
9798
Targets.erase(id);
9899
}
99100

100-
void Progress(TReplication::TPtr self, const TActorContext& ctx) {
101+
void Progress(const TActorContext& ctx) {
101102
if (!YdbProxy) {
102103
THolder<IActor> ydbProxy;
103104
const auto& params = Config.GetSrcConnectionParams();
@@ -136,14 +137,15 @@ class TReplication::TImpl {
136137
if (!Targets) {
137138
return DiscoverTargets(ctx);
138139
} else {
139-
return ProgressTargets(self, ctx);
140+
return ProgressTargets(ctx);
140141
}
141142
case EState::Removing:
142143
if (!Targets) {
143144
return (void)ctx.Send(ctx.SelfID, new TEvPrivate::TEvDropReplication(ReplicationId));
144145
} else {
145-
return ProgressTargets(self, ctx);
146+
return ProgressTargets(ctx);
146147
}
148+
case EState::Done:
147149
case EState::Error:
148150
return;
149151
}
@@ -180,6 +182,7 @@ class TReplication::TImpl {
180182
TString Issue;
181183
ui64 NextTargetId = 1;
182184
THashMap<ui64, THolder<ITarget>> Targets;
185+
THashSet<ui64> PendingAlterTargets;
183186
TActorId SecretResolver;
184187
TActorId YdbProxy;
185188
TActorId TenantResolver;
@@ -209,11 +212,11 @@ TReplication::TReplication(ui64 id, const TPathId& pathId, const TString& config
209212
}
210213

211214
ui64 TReplication::AddTarget(ETargetKind kind, const TString& srcPath, const TString& dstPath) {
212-
return Impl->AddTarget(kind, srcPath, dstPath);
215+
return Impl->AddTarget(this, kind, srcPath, dstPath);
213216
}
214217

215218
TReplication::ITarget* TReplication::AddTarget(ui64 id, ETargetKind kind, const TString& srcPath, const TString& dstPath) {
216-
Impl->AddTarget(id, kind, srcPath, dstPath);
219+
Impl->AddTarget(this, id, kind, srcPath, dstPath);
217220
return Impl->FindTarget(id);
218221
}
219222

@@ -230,7 +233,7 @@ void TReplication::RemoveTarget(ui64 id) {
230233
}
231234

232235
void TReplication::Progress(const TActorContext& ctx) {
233-
Impl->Progress(this, ctx);
236+
Impl->Progress(ctx);
234237
}
235238

236239
void TReplication::Shutdown(const TActorContext& ctx) {
@@ -308,6 +311,18 @@ const std::optional<TReplication::TDropOp>& TReplication::GetDropOp() const {
308311
return DropOp;
309312
}
310313

314+
void TReplication::AddPendingAlterTarget(ui64 id) {
315+
Impl->PendingAlterTargets.insert(id);
316+
}
317+
318+
void TReplication::RemovePendingAlterTarget(ui64 id) {
319+
Impl->PendingAlterTargets.erase(id);
320+
}
321+
322+
bool TReplication::CheckAlterDone() const {
323+
return Impl->State == EState::Ready && Impl->PendingAlterTargets.empty();
324+
}
325+
311326
}
312327

313328
Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) {

ydb/core/tx/replication/controller/replication.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
2222

2323
enum class EState: ui8 {
2424
Ready,
25+
Done,
2526
Removing,
2627
Error = 255
2728
};
@@ -73,13 +74,17 @@ class TReplication: public TSimpleRefCount<TReplication> {
7374
virtual const TString& GetIssue() const = 0;
7475
virtual void SetIssue(const TString& value) = 0;
7576

76-
virtual void Progress(TReplication::TPtr replication, const TActorContext& ctx) = 0;
77+
virtual void Progress(const TActorContext& ctx) = 0;
7778
virtual void Shutdown(const TActorContext& ctx) = 0;
7879

7980
protected:
80-
virtual IActor* CreateWorkerRegistar(TReplication::TPtr replication, const TActorContext& ctx) const = 0;
81+
virtual IActor* CreateWorkerRegistar(const TActorContext& ctx) const = 0;
8182
};
8283

84+
friend class TTargetBase;
85+
void AddPendingAlterTarget(ui64 id);
86+
void RemovePendingAlterTarget(ui64 id);
87+
8388
struct TDropOp {
8489
TActorId Sender;
8590
std::pair<ui64, ui32> OperationId; // txId, partId
@@ -116,6 +121,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
116121
void SetTenant(const TString& value);
117122
const TString& GetTenant() const;
118123

124+
bool CheckAlterDone() const;
125+
119126
void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId);
120127
const std::optional<TDropOp>& GetDropOp() const;
121128

ydb/core/tx/replication/controller/target_base.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ using ETargetKind = TReplication::ETargetKind;
1212
using EDstState = TReplication::EDstState;
1313
using EStreamState = TReplication::EStreamState;
1414

15-
TTargetBase::TTargetBase(ETargetKind kind, ui64 id, const TString& srcPath, const TString& dstPath)
16-
: Id(id)
15+
TTargetBase::TTargetBase(TReplication::TPtr replication, ETargetKind kind,
16+
ui64 id, const TString& srcPath, const TString& dstPath)
17+
: Replication(replication)
18+
, Id(id)
1719
, Kind(kind)
1820
, SrcPath(srcPath)
1921
, DstPath(dstPath)
@@ -42,6 +44,14 @@ EDstState TTargetBase::GetDstState() const {
4244

4345
void TTargetBase::SetDstState(const EDstState value) {
4446
DstState = value;
47+
switch (DstState) {
48+
case EDstState::Alter:
49+
return Replication->AddPendingAlterTarget(Id);
50+
case EDstState::Done:
51+
return Replication->RemovePendingAlterTarget(Id);
52+
default:
53+
break;
54+
}
4555
}
4656

4757
const TPathId& TTargetBase::GetDstPathId() const {
@@ -77,30 +87,30 @@ void TTargetBase::SetIssue(const TString& value) {
7787
TruncatedIssue(Issue);
7888
}
7989

80-
void TTargetBase::Progress(TReplication::TPtr replication, const TActorContext& ctx) {
90+
void TTargetBase::Progress(const TActorContext& ctx) {
8191
switch (DstState) {
8292
case EDstState::Creating:
8393
if (!DstCreator) {
84-
DstCreator = ctx.Register(CreateDstCreator(replication, Id, ctx));
94+
DstCreator = ctx.Register(CreateDstCreator(Replication, Id, ctx));
8595
}
8696
break;
8797
case EDstState::Syncing:
8898
break; // TODO
8999
case EDstState::Ready:
90100
if (!WorkerRegistar) {
91-
WorkerRegistar = ctx.Register(CreateWorkerRegistar(replication, ctx));
101+
WorkerRegistar = ctx.Register(CreateWorkerRegistar(ctx));
92102
}
93103
break;
94104
case EDstState::Alter:
95105
if (!DstAlterer) {
96-
DstAlterer = ctx.Register(CreateDstAlterer(replication, Id, ctx));
106+
DstAlterer = ctx.Register(CreateDstAlterer(Replication, Id, ctx));
97107
}
98108
break;
99109
case EDstState::Done:
100110
break;
101111
case EDstState::Removing:
102112
if (!DstRemover) {
103-
DstRemover = ctx.Register(CreateDstRemover(replication, Id, ctx));
113+
DstRemover = ctx.Register(CreateDstRemover(Replication, Id, ctx));
104114
}
105115
break;
106116
case EDstState::Error:

ydb/core/tx/replication/controller/target_base.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ class TTargetBase: public TReplication::ITarget {
1010
using EDstState = TReplication::EDstState;
1111
using EStreamState = TReplication::EStreamState;
1212

13+
inline TReplication::TPtr GetReplication() const {
14+
return Replication;
15+
}
16+
1317
public:
14-
explicit TTargetBase(ETargetKind kind, ui64 id, const TString& srcPath, const TString& dstPath);
18+
explicit TTargetBase(TReplication::TPtr replication, ETargetKind kind,
19+
ui64 id, const TString& srcPath, const TString& dstPath);
1520

1621
ui64 GetId() const override;
1722
ETargetKind GetKind() const override;
@@ -34,10 +39,11 @@ class TTargetBase: public TReplication::ITarget {
3439
const TString& GetIssue() const override;
3540
void SetIssue(const TString& value) override;
3641

37-
void Progress(TReplication::TPtr replication, const TActorContext& ctx) override;
42+
void Progress(const TActorContext& ctx) override;
3843
void Shutdown(const TActorContext& ctx) override;
3944

4045
private:
46+
TReplication::TPtr Replication;
4147
const ui64 Id;
4248
const ETargetKind Kind;
4349
const TString SrcPath;

ydb/core/tx/replication/controller/target_table.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,13 @@ class TTableWorkerRegistar: public TActorBootstrapped<TTableWorkerRegistar> {
102102

103103
}; // TTableWorkerRegistar
104104

105-
TTableTarget::TTableTarget(ui64 id, const TString& srcPath, const TString& dstPath)
106-
: TTargetWithStream(ETargetKind::Table, id, srcPath, dstPath)
105+
TTableTarget::TTableTarget(TReplication::TPtr replication, ui64 id, const TString& srcPath, const TString& dstPath)
106+
: TTargetWithStream(replication, ETargetKind::Table, id, srcPath, dstPath)
107107
{
108108
}
109109

110-
IActor* TTableTarget::CreateWorkerRegistar(TReplication::TPtr replication, const TActorContext& ctx) const {
110+
IActor* TTableTarget::CreateWorkerRegistar(const TActorContext& ctx) const {
111+
auto replication = GetReplication();
111112
return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(),
112113
replication->GetConfig().GetSrcConnectionParams(), replication->GetId(), GetId(),
113114
CanonizePath(ChildPath(SplitPath(GetSrcPath()), GetStreamName())), GetDstPathId());

ydb/core/tx/replication/controller/target_table.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ namespace NKikimr::NReplication::NController {
66

77
class TTableTarget: public TTargetWithStream {
88
public:
9-
explicit TTableTarget(ui64 id, const TString& srcPath, const TString& dstPath);
9+
explicit TTableTarget(TReplication::TPtr replication,
10+
ui64 id, const TString& srcPath, const TString& dstPath);
1011

1112
protected:
12-
IActor* CreateWorkerRegistar(TReplication::TPtr replication, const TActorContext& ctx) const override;
13+
IActor* CreateWorkerRegistar(const TActorContext& ctx) const override;
1314

1415
}; // TTableTarget
1516

ydb/core/tx/replication/controller/target_with_stream.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ namespace NKikimr::NReplication::NController {
99

1010
const TString ReplicationConsumerName = "replicationConsumer";
1111

12-
void TTargetWithStream::Progress(TReplication::TPtr replication, const TActorContext& ctx) {
12+
void TTargetWithStream::Progress(const TActorContext& ctx) {
13+
auto replication = GetReplication();
14+
1315
switch (GetStreamState()) {
1416
case EStreamState::Creating:
1517
if (GetStreamName().empty() && !NameAssignmentInProcess) {
@@ -30,7 +32,7 @@ void TTargetWithStream::Progress(TReplication::TPtr replication, const TActorCon
3032
break;
3133
}
3234

33-
TTargetBase::Progress(replication, ctx);
35+
TTargetBase::Progress(ctx);
3436
}
3537

3638
void TTargetWithStream::Shutdown(const TActorContext& ctx) {

ydb/core/tx/replication/controller/target_with_stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class TTargetWithStream: public TTargetBase {
1515
SetStreamState(EStreamState::Creating);
1616
}
1717

18-
void Progress(TReplication::TPtr replication, const TActorContext& ctx) override;
18+
void Progress(const TActorContext& ctx) override;
1919
void Shutdown(const TActorContext& ctx) override;
2020

2121
private:

ydb/core/tx/replication/controller/tx_alter_dst_result.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ namespace NKikimr::NReplication::NController {
44

55
class TController::TTxAlterDstResult: public TTxBase {
66
TEvPrivate::TEvAlterDstResult::TPtr Ev;
7-
TReplication::TPtr Replication;
87

98
public:
109
explicit TTxAlterDstResult(TController* self, TEvPrivate::TEvAlterDstResult::TPtr& ev)
@@ -23,14 +22,14 @@ class TController::TTxAlterDstResult: public TTxBase {
2322
const auto rid = Ev->Get()->ReplicationId;
2423
const auto tid = Ev->Get()->TargetId;
2524

26-
Replication = Self->Find(rid);
27-
if (!Replication) {
25+
auto replication = Self->Find(rid);
26+
if (!replication) {
2827
CLOG_W(ctx, "Unknown replication"
2928
<< ": rid# " << rid);
3029
return true;
3130
}
3231

33-
auto* target = Replication->FindTarget(tid);
32+
auto* target = replication->FindTarget(tid);
3433
if (!target) {
3534
CLOG_W(ctx, "Unknown target"
3635
<< ": rid# " << rid
@@ -52,8 +51,14 @@ class TController::TTxAlterDstResult: public TTxBase {
5251
CLOG_N(ctx, "Target dst altered"
5352
<< ": rid# " << rid
5453
<< ", tid# " << tid);
54+
55+
if (replication->CheckAlterDone()) {
56+
CLOG_N(ctx, "Replication altered"
57+
<< ": rid# " << rid);
58+
replication->SetState(TReplication::EState::Done);
59+
}
5560
} else {
56-
Replication->SetState(TReplication::EState::Error);
61+
replication->SetState(TReplication::EState::Error);
5762
target->SetDstState(TReplication::EDstState::Error);
5863
target->SetIssue(TStringBuilder() << "Alter dst error"
5964
<< ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
@@ -68,7 +73,7 @@ class TController::TTxAlterDstResult: public TTxBase {
6873

6974
NIceDb::TNiceDb db(txc.DB);
7075
db.Table<Schema::Replications>().Key(rid).Update(
71-
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState())
76+
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState())
7277
);
7378
db.Table<Schema::Targets>().Key(rid, tid).Update(
7479
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
@@ -80,10 +85,6 @@ class TController::TTxAlterDstResult: public TTxBase {
8085

8186
void Complete(const TActorContext& ctx) override {
8287
CLOG_D(ctx, "Complete");
83-
84-
if (Replication) {
85-
Replication->Progress(ctx);
86-
}
8788
}
8889

8990
}; // TTxAlterDstResult

ydb/core/tx/replication/controller/tx_describe_replication.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@ class TController::TTxDescribeReplication: public TTxBase {
5656
case TReplication::EState::Removing:
5757
state.MutableStandBy();
5858
break;
59+
case TReplication::EState::Done:
60+
state.MutableDone();
61+
break;
5962
case TReplication::EState::Error:
6063
if (auto issue = state.MutableError()->AddIssues()) {
6164
issue->set_severity(NYql::TSeverityIds::S_ERROR);
6265
issue->set_message(replication->GetIssue());
6366
}
6467
break;
65-
// TODO: 'done' state
6668
}
6769

6870
return true;

0 commit comments

Comments
 (0)