Skip to content

Commit 4eb59fa

Browse files
committed
Alter replication config during replication alter (ydb-platform#4696)
1 parent f4c9285 commit 4eb59fa

File tree

14 files changed

+399
-4
lines changed

14 files changed

+399
-4
lines changed

ydb/core/protos/counters_replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ enum ETxTypes {
3535
TXTYPE_DROP_DST_RESULT = 9 [(TxTypeOpts) = {Name: "TxDropDstResult"}];
3636
TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}];
3737
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
38+
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
3839
}

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ STFUNC(TController::StateWork) {
5151
HFunc(TEvPrivate::TEvCreateStreamResult, Handle);
5252
HFunc(TEvPrivate::TEvDropStreamResult, Handle);
5353
HFunc(TEvPrivate::TEvCreateDstResult, Handle);
54+
HFunc(TEvPrivate::TEvAlterDstResult, Handle);
5455
HFunc(TEvPrivate::TEvDropDstResult, Handle);
5556
HFunc(TEvPrivate::TEvResolveSecretResult, Handle);
5657
HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
@@ -148,6 +149,11 @@ void TController::Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorC
148149
RunTxCreateDstResult(ev, ctx);
149150
}
150151

152+
void TController::Handle(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx) {
153+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
154+
RunTxAlterDstResult(ev, ctx);
155+
}
156+
151157
void TController::Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx) {
152158
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
153159
RunTxDropDstResult(ev, ctx);

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TController
7474
void Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
7575
void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
7676
void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
77+
void Handle(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
7778
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
7879
void Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
7980
void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx);
@@ -103,6 +104,7 @@ class TController
103104
class TTxCreateStreamResult;
104105
class TTxDropStreamResult;
105106
class TTxCreateDstResult;
107+
class TTxAlterDstResult;
106108
class TTxDropDstResult;
107109
class TTxResolveSecretResult;
108110

@@ -118,6 +120,7 @@ class TController
118120
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
119121
void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
120122
void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
123+
void RunTxAlterDstResult(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
121124
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
122125
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
123126

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
#include "dst_alterer.h"
2+
#include "logging.h"
3+
#include "private_events.h"
4+
5+
#include <ydb/core/base/tablet_pipecache.h>
6+
#include <ydb/core/tx/schemeshard/schemeshard.h>
7+
#include <ydb/core/tx/tx_proxy/proxy.h>
8+
#include <ydb/library/actors/core/actor_bootstrapped.h>
9+
#include <ydb/library/actors/core/hfunc.h>
10+
11+
namespace NKikimr::NReplication::NController {
12+
13+
using namespace NSchemeShard;
14+
15+
class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
16+
void AllocateTxId() {
17+
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
18+
Become(&TThis::StateAllocateTxId);
19+
}
20+
21+
STATEFN(StateAllocateTxId) {
22+
switch (ev->GetTypeRewrite()) {
23+
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
24+
default:
25+
return StateBase(ev);
26+
}
27+
}
28+
29+
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
30+
LOG_T("Handle " << ev->Get()->ToString());
31+
32+
TxId = ev->Get()->TxId;
33+
PipeCache = ev->Get()->Services.LeaderPipeCache;
34+
AlterDst();
35+
}
36+
37+
void AlterDst() {
38+
auto ev = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);
39+
auto& tx = *ev->Record.AddTransaction();
40+
tx.SetInternal(true);
41+
42+
switch (Kind) {
43+
case TReplication::ETargetKind::Table:
44+
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
45+
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
46+
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(
47+
NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE);
48+
break;
49+
}
50+
51+
Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true));
52+
Become(&TThis::StateAlterDst);
53+
}
54+
55+
STATEFN(StateAlterDst) {
56+
switch (ev->GetTypeRewrite()) {
57+
hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
58+
hFunc(TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
59+
sFunc(TEvents::TEvWakeup, AllocateTxId);
60+
default:
61+
return StateBase(ev);
62+
}
63+
}
64+
65+
void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
66+
LOG_T("Handle " << ev->Get()->ToString());
67+
const auto& record = ev->Get()->Record;
68+
69+
switch (record.GetStatus()) {
70+
case NKikimrScheme::StatusAccepted:
71+
Y_DEBUG_ABORT_UNLESS(TxId == record.GetTxId());
72+
return SubscribeTx(record.GetTxId());
73+
case NKikimrScheme::StatusMultipleModifications:
74+
return Retry();
75+
default:
76+
return Error(record.GetStatus(), record.GetReason());
77+
}
78+
}
79+
80+
void SubscribeTx(ui64 txId) {
81+
LOG_D("Subscribe tx"
82+
<< ": txId# " << txId);
83+
Send(PipeCache, new TEvPipeCache::TEvForward(new TEvSchemeShard::TEvNotifyTxCompletion(txId), SchemeShardId));
84+
}
85+
86+
void Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
87+
LOG_T("Handle " << ev->Get()->ToString());
88+
Success();
89+
}
90+
91+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
92+
LOG_T("Handle " << ev->Get()->ToString());
93+
94+
if (SchemeShardId == ev->Get()->TabletId) {
95+
return;
96+
}
97+
98+
Retry();
99+
}
100+
101+
void Handle(TEvents::TEvUndelivered::TPtr& ev) {
102+
LOG_T("Handle " << ev->Get()->ToString());
103+
Retry();
104+
}
105+
106+
void Success() {
107+
LOG_I("Success");
108+
109+
Send(Parent, new TEvPrivate::TEvAlterDstResult(ReplicationId, TargetId));
110+
PassAway();
111+
}
112+
113+
void Error(NKikimrScheme::EStatus status, const TString& error) {
114+
LOG_E("Error"
115+
<< ": status# " << status
116+
<< ", reason# " << error);
117+
118+
Send(Parent, new TEvPrivate::TEvAlterDstResult(ReplicationId, TargetId, status, error));
119+
PassAway();
120+
}
121+
122+
void Retry() {
123+
LOG_D("Retry");
124+
Schedule(RetryInterval, new TEvents::TEvWakeup);
125+
}
126+
127+
public:
128+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
129+
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_ALTERER;
130+
}
131+
132+
explicit TDstAlterer(
133+
const TActorId& parent,
134+
ui64 schemeShardId,
135+
ui64 rid,
136+
ui64 tid,
137+
TReplication::ETargetKind kind,
138+
const TPathId& dstPathId)
139+
: Parent(parent)
140+
, SchemeShardId(schemeShardId)
141+
, ReplicationId(rid)
142+
, TargetId(tid)
143+
, Kind(kind)
144+
, DstPathId(dstPathId)
145+
, LogPrefix("DstAlterer", ReplicationId, TargetId)
146+
{
147+
}
148+
149+
void Bootstrap() {
150+
if (!DstPathId) {
151+
Success();
152+
} else {
153+
AllocateTxId();
154+
}
155+
}
156+
157+
STATEFN(StateBase) {
158+
switch (ev->GetTypeRewrite()) {
159+
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
160+
hFunc(TEvents::TEvUndelivered, Handle);
161+
sFunc(TEvents::TEvPoison, PassAway);
162+
}
163+
}
164+
165+
private:
166+
const TActorId Parent;
167+
const ui64 SchemeShardId;
168+
const ui64 ReplicationId;
169+
const ui64 TargetId;
170+
const TReplication::ETargetKind Kind;
171+
const TPathId DstPathId;
172+
const TActorLogPrefix LogPrefix;
173+
174+
ui64 TxId = 0;
175+
TActorId PipeCache;
176+
static constexpr auto RetryInterval = TDuration::Seconds(10);
177+
178+
}; // TDstAlterer
179+
180+
IActor* CreateDstAlterer(TReplication::TPtr replication, ui64 targetId, const TActorContext& ctx) {
181+
const auto* target = replication->FindTarget(targetId);
182+
Y_ABORT_UNLESS(target);
183+
return CreateDstAlterer(ctx.SelfID, replication->GetSchemeShardId(),
184+
replication->GetId(), target->GetId(), target->GetKind(), target->GetDstPathId());
185+
}
186+
187+
IActor* CreateDstAlterer(const TActorId& parent, ui64 schemeShardId,
188+
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId)
189+
{
190+
return new TDstAlterer(parent, schemeShardId, rid, tid, kind, dstPathId);
191+
}
192+
193+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include "replication.h"
4+
5+
namespace NKikimr::NReplication::NController {
6+
7+
IActor* CreateDstAlterer(TReplication::TPtr replication, ui64 targetId, const TActorContext& ctx);
8+
IActor* CreateDstAlterer(const TActorId& parent, ui64 schemeShardId,
9+
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TPathId& dstPathId);
10+
11+
}

ydb/core/tx/replication/controller/private_events.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ bool TEvPrivate::TEvResolveSecretResult::IsSuccess() const {
143143
return Success;
144144
}
145145

146+
TEvPrivate::TEvAlterDstResult::TEvAlterDstResult(ui64 rid, ui64 tid, NKikimrScheme::EStatus status, const TString& error)
147+
: TBase(rid, tid, status, error)
148+
{
149+
}
150+
151+
TString TEvPrivate::TEvAlterDstResult::ToString() const {
152+
return TStringBuilder() << ToStringHeader() << " {" << ToStringBody() << " }";
153+
}
154+
146155
}
147156

148157
Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) {

ydb/core/tx/replication/controller/private_events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ struct TEvPrivate {
2222
EvUpdateTenantNodes,
2323
EvRunWorkers,
2424
EvResolveSecretResult,
25+
EvAlterDstResult,
2526

2627
EvEnd,
2728
};
@@ -175,6 +176,12 @@ struct TEvPrivate {
175176
bool IsSuccess() const;
176177
};
177178

179+
struct TEvAlterDstResult: public TGenericSchemeResult<TEvAlterDstResult, EvAlterDstResult> {
180+
explicit TEvAlterDstResult(ui64 rid, ui64 tid,
181+
NKikimrScheme::EStatus status = NKikimrScheme::StatusSuccess, const TString& error = {});
182+
TString ToString() const override;
183+
};
184+
178185
}; // TEvPrivate
179186

180187
}

ydb/core/tx/replication/controller/replication.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
3434
Creating,
3535
Syncing,
3636
Ready,
37+
Alter,
38+
Done,
3739
Removing,
3840
Error = 255
3941
};

ydb/core/tx/replication/controller/target_base.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "dst_alterer.h"
12
#include "dst_creator.h"
23
#include "dst_remover.h"
34
#include "target_base.h"
@@ -90,6 +91,13 @@ void TTargetBase::Progress(TReplication::TPtr replication, const TActorContext&
9091
WorkerRegistar = ctx.Register(CreateWorkerRegistar(replication, ctx));
9192
}
9293
break;
94+
case EDstState::Alter:
95+
if (!DstAlterer) {
96+
DstAlterer = ctx.Register(CreateDstAlterer(replication, Id, ctx));
97+
}
98+
break;
99+
case EDstState::Done:
100+
break;
93101
case EDstState::Removing:
94102
if (!DstRemover) {
95103
DstRemover = ctx.Register(CreateDstRemover(replication, Id, ctx));
@@ -101,7 +109,14 @@ void TTargetBase::Progress(TReplication::TPtr replication, const TActorContext&
101109
}
102110

103111
void TTargetBase::Shutdown(const TActorContext& ctx) {
104-
for (auto* x : TVector<TActorId*>{&DstCreator, &DstRemover, &WorkerRegistar}) {
112+
TVector<TActorId*> toShutdown = {
113+
&DstCreator,
114+
&DstAlterer,
115+
&DstRemover,
116+
&WorkerRegistar,
117+
};
118+
119+
for (auto* x : toShutdown) {
105120
if (auto actorId = std::exchange(*x, {})) {
106121
ctx.Send(actorId, new TEvents::TEvPoison());
107122
}

ydb/core/tx/replication/controller/target_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class TTargetBase: public TReplication::ITarget {
5050
TString Issue;
5151

5252
TActorId DstCreator;
53+
TActorId DstAlterer;
5354
TActorId DstRemover;
5455
TActorId WorkerRegistar;
5556

0 commit comments

Comments
 (0)