Skip to content

Commit 9f18f65

Browse files
authored
Merge dd98c3e into d52cc84
2 parents d52cc84 + dd98c3e commit 9f18f65

File tree

14 files changed

+306
-13
lines changed

14 files changed

+306
-13
lines changed

ydb/core/external_sources/external_data_source.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ struct TExternalDataSource : public IExternalSource {
3636
ythrow TExternalSourceException() << "Only external table supports parameters";
3737
}
3838

39+
bool IsRDBMSDataSource(const TProtoStringType& sourceType) const {
40+
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "Clickhouse"}, sourceType);
41+
}
42+
3943
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
4044
NKikimrSchemeOp::TExternalDataSourceDescription proto;
4145
if (!proto.ParseFromString(externalDataSourceDescription)) {
@@ -49,6 +53,10 @@ struct TExternalDataSource : public IExternalSource {
4953
ythrow TExternalSourceException() << "Unsupported property: " << key;
5054
}
5155

56+
if (IsRDBMSDataSource(proto.GetSourceType()) && !proto.GetProperties().GetProperties().contains("database_name")){
57+
ythrow TExternalSourceException() << proto.GetSourceType() << " source must provide database_name";
58+
}
59+
5260
ValidateHostname(HostnamePatterns, proto.GetLocation());
5361
}
5462

ydb/core/fq/libs/actors/clusters_from_connections.cpp

+24
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,30 @@ void AddClustersFromConnections(
271271
clusters.emplace(connectionName, GenericProviderName);
272272
break;
273273
}
274+
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
275+
FillGenericClusterConfig(
276+
common,
277+
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
278+
conn.content().setting().greenplum_cluster(),
279+
connectionName,
280+
NYql::NConnector::NApi::EDataSourceKind::GREENPLUM,
281+
authToken,
282+
accountIdSignatures);
283+
clusters.emplace(connectionName, GenericProviderName);
284+
break;
285+
}
286+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
287+
FillGenericClusterConfig(
288+
common,
289+
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
290+
conn.content().setting().mysql_cluster(),
291+
connectionName,
292+
NYql::NConnector::NApi::EDataSourceKind::MYSQL,
293+
authToken,
294+
accountIdSignatures);
295+
clusters.emplace(connectionName, GenericProviderName);
296+
break;
297+
}
274298

275299
// Do not replace with default. Adding a new connection should cause a compilation error
276300
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:

ydb/core/fq/libs/actors/database_resolver.cpp

+53-3
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
319319
// There are two kinds of managed YDBs: serverless and dedicated.
320320
// While working with dedicated databases, we have to use underlay network.
321321
// That's why we add `u-` prefix to database fqdn.
322-
if (databaseInfo.GetMap().contains("dedicatedDatabase")) {
322+
if (databaseInfo.GetMap().contains("storageConfig")) {
323323
endpoint = "u-" + endpoint;
324324
host = "u-" + host;
325325
}
@@ -335,7 +335,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
335335
{
336336
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls, protocol);
337337
// TODO: Take explicit field from MVP
338-
bool isDedicatedDb = databaseInfo.GetMap().contains("dedicatedDatabase");
338+
bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
339339
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
340340
// Replace "ydb." -> "yds."
341341
ret.Endpoint[2] = 's';
@@ -457,6 +457,56 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
457457

458458
endpoint = mdbEndpointGenerator->ToEndpoint(params);
459459

460+
return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
461+
};
462+
Parsers[NYql::EDatabaseType::MySQL] = [](
463+
NJson::TJsonValue& databaseInfo,
464+
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
465+
bool useTls,
466+
NConnector::NApi::EProtocol protocol
467+
) {
468+
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
469+
TVector<TString> aliveHosts;
470+
471+
const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();
472+
473+
for (const auto& host : hostsArray) {
474+
const auto& hostMap = host.GetMap();
475+
476+
if (!hostMap.contains("services")) {
477+
// indicates that cluster is down
478+
continue;
479+
}
480+
481+
const auto& servicesArray = hostMap.at("services").GetArraySafe();
482+
483+
// check if all services of a particular host are alive
484+
const bool alive = std::all_of(
485+
servicesArray.begin(),
486+
servicesArray.end(),
487+
[](const auto& service) {
488+
return service["health"].GetString() == "ALIVE";
489+
}
490+
);
491+
492+
if (alive) {
493+
aliveHosts.push_back(host["name"].GetString());
494+
}
495+
}
496+
497+
if (aliveHosts.empty()) {
498+
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
499+
}
500+
501+
NYql::IMdbEndpointGenerator::TParams params = {
502+
.DatabaseType = NYql::EDatabaseType::MySQL,
503+
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
504+
.UseTls = useTls,
505+
.Protocol = protocol,
506+
};
507+
508+
endpoint = mdbEndpointGenerator->ToEndpoint(params);
509+
460510
return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
461511
};
462512
}
@@ -538,7 +588,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
538588
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
539589
.AddUrlParam("databaseId", databaseId)
540590
.Build();
541-
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
591+
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
542592
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
543593
url = TUrlBuilder(
544594
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")

ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp

+77-4
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
242242
R"(
243243
{
244244
"endpoint":"grpcs://lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1gtl2kg13him37quoo6/etn021us5r9rhld1vgbh",
245-
"dedicatedDatabase":{"resuorcePresetId": "medium"}
245+
"storageConfig":{"storageSizeLimit":107374182400}
246246
})",
247247
NYql::TDatabaseResolverResponse::TDatabaseDescription{
248248
TString{"u-lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135"},
@@ -285,7 +285,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
285285
R"(
286286
{
287287
"endpoint":"grpcs://lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh",
288-
"dedicatedDatabase":{"resourcePresetId": "medium"}
288+
"storageConfig":{"storageSizeLimit":107374182400}
289289
})",
290290
NYql::TDatabaseResolverResponse::TDatabaseDescription{
291291
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
@@ -473,6 +473,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
473473
issues
474474
);
475475
}
476+
476477
Y_UNIT_TEST(Greenplum_MasterNode) {
477478
Test(
478479
NYql::EDatabaseType::Greenplum,
@@ -504,7 +505,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
504505
TString(""),
505506
true},
506507
{});
507-
}
508+
}
508509

509510
Y_UNIT_TEST(Greenplum_PermissionDenied) {
510511
NYql::TIssues issues{
@@ -535,7 +536,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
535536
)",
536537
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
537538
issues);
538-
}
539+
}
540+
541+
Y_UNIT_TEST(MySQL) {
542+
Test(
543+
NYql::EDatabaseType::MySQL,
544+
NYql::NConnector::NApi::EProtocol::NATIVE,
545+
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
546+
"200",
547+
R"({
548+
"hosts": [
549+
{
550+
"services": [
551+
{
552+
"type": "POOLER",
553+
"health": "ALIVE"
554+
},
555+
{
556+
"type": "MYSQL",
557+
"health": "ALIVE"
558+
}
559+
],
560+
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
561+
"clusterId": "c9qb2bjghs8onbncpamk",
562+
"zoneId": "ru-central1-b",
563+
"role": "MASTER",
564+
"health": "ALIVE"
565+
}
566+
]
567+
})",
568+
NYql::TDatabaseResolverResponse::TDatabaseDescription{
569+
TString{""},
570+
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
571+
3306,
572+
TString(""),
573+
true
574+
},
575+
{});
576+
}
577+
578+
Y_UNIT_TEST(MySQL_PermissionDenied) {
579+
NYql::TIssues issues{
580+
NYql::TIssue(
581+
TStringBuilder{} << MakeErrorPrefix(
582+
"mdb.api.cloud.yandex.net:443",
583+
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
584+
"etn021us5r9rhld1vgbh",
585+
NYql::EDatabaseType::MySQL
586+
) << NoPermissionStr
587+
)
588+
};
589+
590+
Test(
591+
NYql::EDatabaseType::MySQL,
592+
NYql::NConnector::NApi::EProtocol::NATIVE,
593+
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
594+
"403",
595+
R"(
596+
{
597+
"code": 7,
598+
"message": "Permission denied",
599+
"details": [
600+
{
601+
"@type": "type.googleapis.com/google.rpc.RequestInfo",
602+
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
603+
}
604+
]
605+
}
606+
)",
607+
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
608+
issues
609+
);
610+
}
611+
539612

540613
Y_UNIT_TEST(DataStreams_PermissionDenied) {
541614
NYql::TIssues issues{

ydb/core/fq/libs/common/util.cpp

+22
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
126126
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
127127
return GetServiceAccountId(setting.postgresql_cluster().auth());
128128
}
129+
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
130+
return GetServiceAccountId(setting.greenplum_cluster().auth());
131+
}
132+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
133+
return GetServiceAccountId(setting.mysql_cluster().auth());
134+
}
129135
// Do not replace with default. Adding a new connection should cause a compilation error
130136
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
131137
break;
@@ -157,6 +163,10 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
157163
return {};
158164
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
159165
return setting.postgresql_cluster().login();
166+
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
167+
return setting.greenplum_cluster().login();
168+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
169+
return setting.mysql_cluster().login();
160170
}
161171
}
162172

@@ -176,6 +186,10 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
176186
return {};
177187
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
178188
return setting.postgresql_cluster().password();
189+
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
190+
return setting.greenplum_cluster().password();
191+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
192+
return setting.mysql_cluster().password();
179193
}
180194
}
181195

@@ -195,6 +209,10 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
195209
return GetIamAuthMethod(setting.monitoring().auth());
196210
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
197211
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
212+
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
213+
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
214+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
215+
return GetBasicAuthMethod(setting.mysql_cluster().auth());
198216
}
199217
}
200218

@@ -212,6 +230,10 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
212230
return connection.content().setting().monitoring().auth();
213231
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
214232
return connection.content().setting().postgresql_cluster().auth();
233+
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
234+
return connection.content().setting().greenplum_cluster().auth();
235+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
236+
return connection.content().setting().mysql_cluster().auth();
215237
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
216238
return FederatedQuery::IamAuth{};
217239
}

ydb/core/fq/libs/compute/common/config.h

+2
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ class TComputeConfig {
164164
case FederatedQuery::ConnectionSetting::kObjectStorage:
165165
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
166166
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
167+
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
168+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
167169
case FederatedQuery::ConnectionSetting::kYdbDatabase:
168170
return true;
169171
case FederatedQuery::ConnectionSetting::kDataStreams:

ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp

+34-3
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ TString MakeCreateExternalDataSourceQuery(
214214
}
215215
case FederatedQuery::ConnectionSetting::kMonitoring:
216216
break;
217-
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
218-
const auto schema = connectionContent.setting().postgresql_cluster().schema();
217+
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
218+
const auto pgschema = connectionContent.setting().postgresql_cluster().schema();
219219
properties = fmt::format(
220220
R"(
221221
SOURCE_TYPE="PostgreSQL",
@@ -228,7 +228,38 @@ TString MakeCreateExternalDataSourceQuery(
228228
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'),
229229
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_name(), '"'),
230230
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
231-
"schema"_a = schema ? ", SCHEMA=" + EncloseAndEscapeString(schema, '"') : TString{});
231+
"schema"_a = pgschema ? ", SCHEMA=" + EncloseAndEscapeString(pgschema, '"') : TString{});
232+
}
233+
break;
234+
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
235+
const auto gpschema = connectionContent.setting().greenplum_cluster().schema();
236+
properties = fmt::format(
237+
R"(
238+
SOURCE_TYPE="Greenplum",
239+
MDB_CLUSTER_ID={mdb_cluster_id},
240+
DATABASE_NAME={database_name},
241+
USE_TLS="{use_tls}"
242+
{schema}
243+
)",
244+
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_id(), '"'),
245+
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_name(), '"'),
246+
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
247+
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});
248+
249+
}
250+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
251+
properties = fmt::format(
252+
R"(
253+
SOURCE_TYPE="MySQL",
254+
MDB_CLUSTER_ID={mdb_cluster_id},
255+
DATABASE_NAME={database_name},
256+
USE_TLS="{use_tls}"
257+
)",
258+
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
259+
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
260+
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");
261+
262+
}
232263
break;
233264
}
234265

ydb/core/fq/libs/control_plane_proxy/utils/utils.h

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
3131
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
3232
return GetServiceAccountId(setting.postgresql_cluster().auth());
3333
}
34+
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
35+
return GetServiceAccountId(setting.greenplum_cluster().auth());
36+
}
37+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
38+
return GetServiceAccountId(setting.mysql_cluster().auth());
39+
}
3440
// Do not replace with default. Adding a new connection should cause a compilation error
3541
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
3642
break;

0 commit comments

Comments
 (0)