Skip to content

Commit 1fe6460

Browse files
authored
Merge 66a5ab1 into f2c77f7
2 parents f2c77f7 + 66a5ab1 commit 1fe6460

34 files changed

+548
-160
lines changed

ydb/core/base/path.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TStr
3737
return path;
3838
}
3939

40+
inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TVector<TString>& childPath) {
41+
auto path = parentPath;
42+
for (const auto& childName : childPath) {
43+
path.push_back(childName);
44+
}
45+
return path;
46+
}
47+
4048
}

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#include <google/protobuf/util/time_util.h>
1313

14+
#include <util/string/builder.h>
15+
1416
namespace NKikimr::NGRpcService {
1517

1618
using namespace Ydb;
@@ -138,9 +140,18 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
138140
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
139141
}
140142

143+
static TString BuildConnectionString(const NKikimrReplication::TConnectionParams& params) {
144+
return TStringBuilder()
145+
<< (params.GetEnableSsl() ? "grpcs://" : "grpc://")
146+
<< params.GetEndpoint()
147+
<< "/?database=" << params.GetDatabase();
148+
}
149+
141150
static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
142151
to.set_endpoint(from.GetEndpoint());
143152
to.set_database(from.GetDatabase());
153+
to.set_enable_ssl(from.GetEnableSsl());
154+
to.set_connection_string(BuildConnectionString(from));
144155

145156
switch (from.GetCredentialsCase()) {
146157
case NKikimrReplication::TConnectionParams::kStaticCredentials:

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,6 +1650,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
16501650
const auto parseResult = NYdb::ParseConnectionString(*connectionString);
16511651
params.SetEndpoint(parseResult.Endpoint);
16521652
params.SetDatabase(parseResult.Database);
1653+
params.SetEnableSsl(parseResult.EnableSsl);
16531654
}
16541655
if (const auto& endpoint = settings.Settings.Endpoint) {
16551656
params.SetEndpoint(*endpoint);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5605,6 +5605,61 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
56055605
UNIT_ASSERT_EQUAL_C(describe.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
56065606
}
56075607
}
5608+
5609+
void AsyncReplicationConnectionParams(TKikimrRunner& kikimr, const TString& connectionParam, bool ssl = false) {
5610+
using namespace NReplication;
5611+
5612+
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
5613+
auto db = kikimr.GetTableClient();
5614+
auto session = db.CreateSession().GetValueSync().GetSession();
5615+
5616+
{
5617+
auto query = R"(
5618+
--!syntax_v1
5619+
CREATE TABLE `/Root/table` (Key Uint64, Value String, PRIMARY KEY (Key));
5620+
)";
5621+
5622+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
5623+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
5624+
}
5625+
{
5626+
auto query = Sprintf(R"(
5627+
--!syntax_v1
5628+
CREATE ASYNC REPLICATION `/Root/replication` FOR
5629+
`/Root/table` AS `/Root/replica`
5630+
WITH (
5631+
%s, TOKEN = "root@builtin"
5632+
);
5633+
)", connectionParam.c_str());
5634+
5635+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
5636+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
5637+
}
5638+
{
5639+
const auto result = repl.DescribeReplication("/Root/replication").ExtractValueSync();
5640+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
5641+
5642+
const auto& params = result.GetReplicationDescription().GetConnectionParams();
5643+
UNIT_ASSERT_VALUES_EQUAL(params.GetDiscoveryEndpoint(), kikimr.GetEndpoint());
5644+
UNIT_ASSERT_VALUES_EQUAL(params.GetDatabase(), "/Root");
5645+
UNIT_ASSERT_VALUES_EQUAL(params.GetEnableSsl(), ssl);
5646+
}
5647+
}
5648+
5649+
Y_UNIT_TEST(AsyncReplicationConnectionString) {
5650+
TKikimrRunner kikimr;
5651+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpc://%s/?database=/Root")", kikimr.GetEndpoint().c_str()));
5652+
}
5653+
5654+
Y_UNIT_TEST(AsyncReplicationConnectionStringWithSsl) {
5655+
TKikimrRunner kikimr;
5656+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpcs://%s/?database=/Root")", kikimr.GetEndpoint().c_str()), true);
5657+
}
5658+
5659+
Y_UNIT_TEST(AsyncReplicationEndpointAndDatabase) {
5660+
TKikimrRunner kikimr;
5661+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(ENDPOINT = "%s", DATABASE = "/Root")", kikimr.GetEndpoint().c_str()));
5662+
}
56085663
}
56095664

56105665
Y_UNIT_TEST_SUITE(KqpOlapScheme) {

ydb/core/protos/replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ message TOAuthToken {
1919
message TConnectionParams {
2020
optional string Endpoint = 1;
2121
optional string Database = 2;
22+
optional bool EnableSsl = 5;
2223
// credentials
2324
oneof Credentials {
2425
TStaticCredentials StaticCredentials = 3;

ydb/core/tx/replication/controller/dst_alterer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
4141

4242
switch (Kind) {
4343
case TReplication::ETargetKind::Table:
44+
case TReplication::ETargetKind::IndexTable:
4445
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
4546
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
4647
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(

0 commit comments

Comments
 (0)