Skip to content

Commit 4848b75

Browse files
committed
Introduce DescribeReplication (ydb-platform#4904)
1 parent 816240e commit 4848b75

File tree

27 files changed

+924
-3
lines changed

27 files changed

+924
-3
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
#include <ydb/services/persqueue_v1/persqueue.h>
107107
#include <ydb/services/persqueue_v1/topic.h>
108108
#include <ydb/services/rate_limiter/grpc_service.h>
109+
#include <ydb/services/replication/grpc_service.h>
109110
#include <ydb/services/ydb/ydb_clickhouse_internal.h>
110111
#include <ydb/services/ydb/ydb_dummy.h>
111112
#include <ydb/services/ydb/ydb_export.h>
@@ -583,6 +584,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
583584
names["query_service"] = &hasQueryService;
584585
TServiceCfg hasKeyValue = services.empty();
585586
names["keyvalue"] = &hasKeyValue;
587+
TServiceCfg hasReplication = services.empty();
588+
names["replication"] = &hasReplication;
586589

587590
std::unordered_set<TString> enabled;
588591
for (const auto& name : services) {
@@ -849,6 +852,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
849852
grpcRequestProxies[0]));
850853
}
851854

855+
if (hasReplication) {
856+
server.AddService(new NGRpcService::TGRpcReplicationService(ActorSystem.Get(), Counters,
857+
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
858+
}
859+
852860
if (ModuleFactories) {
853861
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
854862
server.AddService(service);

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ PEERDIR(
159159
ydb/services/deprecated/persqueue_v0
160160
ydb/services/persqueue_v1
161161
ydb/services/rate_limiter
162+
ydb/services/replication
162163
ydb/services/ydb
163164
)
164165

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
#include "rpc_scheme_base.h"
2+
#include "service_replication.h"
3+
4+
#include <ydb/core/grpc_services/base/base.h>
5+
#include <ydb/core/tx/replication/controller/public_events.h>
6+
#include <ydb/core/tx/schemeshard/schemeshard.h>
7+
#include <ydb/core/ydb_convert/ydb_convert.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/actors/core/hfunc.h>
10+
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>
11+
12+
namespace NKikimr::NGRpcService {
13+
14+
using namespace Ydb;
15+
16+
using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;
17+
18+
class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
19+
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;
20+
21+
public:
22+
using TBase::TBase;
23+
24+
void Bootstrap() {
25+
DescribeScheme();
26+
}
27+
28+
void PassAway() override {
29+
if (ControllerPipeClient) {
30+
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
31+
}
32+
33+
TBase::PassAway();
34+
}
35+
36+
private:
37+
void DescribeScheme() {
38+
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
39+
SetAuthToken(ev, *Request_);
40+
SetDatabase(ev.get(), *Request_);
41+
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());
42+
43+
Send(MakeTxProxyID(), ev.release());
44+
Become(&TDescribeReplicationRPC::StateDescribeScheme);
45+
}
46+
47+
STATEFN(StateDescribeScheme) {
48+
switch (ev->GetTypeRewrite()) {
49+
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
50+
default:
51+
return TBase::StateWork(ev);
52+
}
53+
}
54+
55+
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
56+
const auto& record = ev->Get()->GetRecord();
57+
const auto& desc = record.GetPathDescription();
58+
59+
if (record.HasReason()) {
60+
auto issue = NYql::TIssue(record.GetReason());
61+
Request_->RaiseIssue(issue);
62+
}
63+
64+
switch (record.GetStatus()) {
65+
case NKikimrScheme::StatusSuccess:
66+
if (desc.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeReplication) {
67+
auto issue = NYql::TIssue("Is not a replication");
68+
Request_->RaiseIssue(issue);
69+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
70+
}
71+
72+
ConvertDirectoryEntry(desc.GetSelf(), Result.mutable_self(), true);
73+
return DescribeReplication(desc.GetReplicationDescription().GetControllerId(),
74+
PathIdFromPathId(desc.GetReplicationDescription().GetPathId()));
75+
76+
case NKikimrScheme::StatusPathDoesNotExist:
77+
case NKikimrScheme::StatusSchemeError:
78+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
79+
80+
case NKikimrScheme::StatusAccessDenied:
81+
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
82+
83+
case NKikimrScheme::StatusNotAvailable:
84+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
85+
86+
default: {
87+
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
88+
}
89+
}
90+
}
91+
92+
void DescribeReplication(ui64 tabletId, const TPathId& pathId) {
93+
if (!ControllerPipeClient) {
94+
NTabletPipe::TClientConfig config;
95+
config.RetryPolicy = {
96+
.RetryLimitCount = 3,
97+
};
98+
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
99+
}
100+
101+
auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
102+
PathIdFromPathId(pathId, ev->Record.MutablePathId());
103+
104+
NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
105+
Become(&TDescribeReplicationRPC::StateDescribeReplication);
106+
}
107+
108+
STATEFN(StateDescribeReplication) {
109+
switch (ev->GetTypeRewrite()) {
110+
HFunc(NReplication::TEvController::TEvDescribeReplicationResult, Handle);
111+
default:
112+
return TBase::StateWork(ev);
113+
}
114+
}
115+
116+
void Handle(NReplication::TEvController::TEvDescribeReplicationResult::TPtr& ev, const TActorContext& ctx) {
117+
auto& record = ev->Get()->Record;
118+
119+
switch (record.GetStatus()) {
120+
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
121+
break;
122+
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
123+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
124+
default:
125+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
126+
}
127+
128+
ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
129+
ConvertState(*record.MutableState(), Result);
130+
131+
for (const auto& target : record.GetTargets()) {
132+
ConvertItem(target, *Result.add_items());
133+
}
134+
135+
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
136+
}
137+
138+
static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
139+
to.set_endpoint(from.GetEndpoint());
140+
to.set_database(from.GetDatabase());
141+
142+
switch (from.GetCredentialsCase()) {
143+
case NKikimrReplication::TConnectionParams::kStaticCredentials:
144+
return ConvertStaticCredentials(from.GetStaticCredentials(), *to.mutable_static_credentials());
145+
case NKikimrReplication::TConnectionParams::kOAuthToken:
146+
return ConvertOAuth(from.GetOAuthToken(), *to.mutable_oauth());
147+
default:
148+
break;
149+
}
150+
}
151+
152+
static void ConvertStaticCredentials(const NKikimrReplication::TStaticCredentials& from, Ydb::Replication::ConnectionParams::StaticCredentials& to) {
153+
to.set_user(from.GetUser());
154+
to.set_password_secret_name(from.GetPasswordSecretName());
155+
}
156+
157+
static void ConvertOAuth(const NKikimrReplication::TOAuthToken& from, Ydb::Replication::ConnectionParams::OAuth& to) {
158+
to.set_token_secret_name(from.GetTokenSecretName());
159+
}
160+
161+
static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) {
162+
to.set_source_path(from.GetSrcPath());
163+
to.set_destination_path(from.GetDstPath());
164+
}
165+
166+
static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
167+
switch (from.GetStateCase()) {
168+
case NKikimrReplication::TReplicationState::kStandBy:
169+
to.mutable_running();
170+
break;
171+
case NKikimrReplication::TReplicationState::kError:
172+
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
173+
break;
174+
case NKikimrReplication::TReplicationState::kDone:
175+
to.mutable_done();
176+
break;
177+
default:
178+
break;
179+
}
180+
}
181+
182+
private:
183+
Ydb::Replication::DescribeReplicationResult Result;
184+
TActorId ControllerPipeClient;
185+
};
186+
187+
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
188+
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
189+
}
190+
191+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include <memory>
4+
5+
namespace NKikimr::NGRpcService {
6+
7+
class IRequestOpCtx;
8+
class IFacilityProvider;
9+
10+
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
11+
12+
}

ydb/core/grpc_services/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ SRCS(
6767
rpc_read_rows.cpp
6868
rpc_remove_directory.cpp
6969
rpc_rename_tables.cpp
70+
rpc_replication.cpp
7071
rpc_rollback_transaction.cpp
7172
rpc_scheme_base.cpp
7273
rpc_stream_execute_scan_query.cpp

ydb/core/protos/counters_replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ enum ETxTypes {
3636
TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}];
3737
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
3838
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
39+
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
3940
}

ydb/core/protos/replication.proto

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "ydb/core/scheme/protos/pathid.proto";
22
import "ydb/public/api/protos/annotations/sensitive.proto";
3+
import "ydb/public/api/protos/ydb_issue_message.proto";
34

45
package NKikimrReplication;
56
option java_package = "ru.yandex.kikimr.proto";
@@ -70,10 +71,15 @@ message TReplicationState {
7071
optional EFailoverMode FailoverMode = 1;
7172
}
7273

74+
message TError {
75+
repeated Ydb.Issue.IssueMessage Issues = 1;
76+
}
77+
7378
oneof State {
7479
TStandBy StandBy = 1;
7580
TPaused Paused = 2;
7681
TDone Done = 3;
82+
TError Error = 4;
7783
}
7884
}
7985

@@ -135,6 +141,23 @@ message TEvDropReplicationResult {
135141
optional EStatus Status = 3;
136142
}
137143

144+
message TEvDescribeReplication {
145+
optional NKikimrProto.TPathID PathId = 1;
146+
}
147+
148+
message TEvDescribeReplicationResult {
149+
enum EStatus {
150+
UNKNOWN = 0;
151+
SUCCESS = 1;
152+
NOT_FOUND = 2;
153+
}
154+
155+
optional EStatus Status = 1;
156+
optional TConnectionParams ConnectionParams = 2;
157+
repeated TReplicationConfig.TTargetSpecific.TTarget Targets = 3;
158+
optional TReplicationState State = 4;
159+
}
160+
138161
message TControllerIdentity {
139162
optional uint64 TabletId = 1;
140163
optional uint64 Generation = 2;

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ STFUNC(TController::StateWork) {
4545
HFunc(TEvController::TEvCreateReplication, Handle);
4646
HFunc(TEvController::TEvAlterReplication, Handle);
4747
HFunc(TEvController::TEvDropReplication, Handle);
48+
HFunc(TEvController::TEvDescribeReplication, Handle);
4849
HFunc(TEvPrivate::TEvDropReplication, Handle);
4950
HFunc(TEvPrivate::TEvDiscoveryTargetsResult, Handle);
5051
HFunc(TEvPrivate::TEvAssignStreamName, Handle);
@@ -124,6 +125,11 @@ void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorC
124125
RunTxDropReplication(ev, ctx);
125126
}
126127

128+
void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx) {
129+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
130+
RunTxDescribeReplication(ev, ctx);
131+
}
132+
127133
void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) {
128134
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
129135
RunTxDiscoveryTargetsResult(ev, ctx);

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class TController
6868
void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx);
6969
void Handle(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
7070
void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
71+
void Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
7172
void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
7273
void Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
7374
void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
@@ -99,6 +100,7 @@ class TController
99100
class TTxCreateReplication;
100101
class TTxAlterReplication;
101102
class TTxDropReplication;
103+
class TTxDescribeReplication;
102104
class TTxDiscoveryTargetsResult;
103105
class TTxAssignStreamName;
104106
class TTxCreateStreamResult;
@@ -115,6 +117,7 @@ class TController
115117
void RunTxAlterReplication(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
116118
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
117119
void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
120+
void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
118121
void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
119122
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
120123
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/replication/controller/public_events.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ struct TEvController {
1515
EvAlterReplicationResult,
1616
EvDropReplication,
1717
EvDropReplicationResult,
18+
EvDescribeReplication,
19+
EvDescribeReplicationResult,
1820

1921
EvEnd,
2022
};
@@ -46,6 +48,14 @@ struct TEvController {
4648
: public TEventPB<TEvDropReplicationResult, NKikimrReplication::TEvDropReplicationResult, EvDropReplicationResult>
4749
{};
4850

51+
struct TEvDescribeReplication
52+
: public TEventPB<TEvDescribeReplication, NKikimrReplication::TEvDescribeReplication, EvDescribeReplication>
53+
{};
54+
55+
struct TEvDescribeReplicationResult
56+
: public TEventPB<TEvDescribeReplicationResult, NKikimrReplication::TEvDescribeReplicationResult, EvDescribeReplicationResult>
57+
{};
58+
4959
}; // TEvController
5060

5161
}

0 commit comments

Comments
 (0)