Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,54 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::MySQL] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TVector<TString> aliveHosts;

const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();

for (const auto& host : hostsArray) {
const auto& hostMap = host.GetMap();

if (!hostMap.contains("services")) {
// indicates that cluster is down
continue;
}

// check if all services of a particular host are alive
const bool alive = std::all_of(
hostMap.at("services").GetArraySafe().begin(),
hostMap.at("services").GetArraySafe().end(),
[](const auto& service) {
return service["health"].GetString() == "ALIVE";
}
);

if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::MySQL,
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}
Expand Down Expand Up @@ -538,7 +586,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
Expand Down
77 changes: 75 additions & 2 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
issues
);
}

Y_UNIT_TEST(Greenplum_MasterNode) {
Test(
NYql::EDatabaseType::Greenplum,
Expand Down Expand Up @@ -505,7 +506,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
TString(""),
true},
{});
}
}

Y_UNIT_TEST(Greenplum_PermissionDenied) {
NYql::TIssues issues{
Expand Down Expand Up @@ -536,7 +537,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}
}

Y_UNIT_TEST(MySQL) {
Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"200",
R"({
"hosts": [
{
"services": [
{
"type": "POOLER",
"health": "ALIVE"
},
{
"type": "MYSQL",
"health": "ALIVE"
}
],
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
"clusterId": "c9qb2bjghs8onbncpamk",
"zoneId": "ru-central1-b",
"role": "MASTER",
"health": "ALIVE"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
3306,
TString(""),
true
},
{});
}

Y_UNIT_TEST(MySQL_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::MySQL
) << NoPermissionStr
)
};

Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues
);
}


Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace NFq {

constexpr ui32 GREENPLUM_PORT = 6432;

constexpr ui32 MYSQL_PORT = 3306;

// TMdbEndpointGeneratorLegacy implements behavior required by YQL legacy ClickHouse provider
class TMdbEndpointGeneratorLegacy: public NYql::IMdbEndpointGenerator {
TEndpoint ToEndpoint(const NYql::IMdbEndpointGenerator::TParams& params) const override {
Expand Down Expand Up @@ -76,13 +78,21 @@ namespace NFq {
ythrow yexception() << "Unexpected protocol for PostgreSQL " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
case NYql::EDatabaseType::Greenplum:
// https://cloud.yandex.ru/docs/managed-postgresql/operations/connect
// https://cloud.yandex.ru/docs/managed-greenplum/operations/connect
switch (params.Protocol) {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, GREENPLUM_PORT);
default:
ythrow yexception() << "Unexpected protocol for Greenplum: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
case NYql::EDatabaseType::MySQL:
// https://cloud.yandex.ru/docs/managed-mysql/operations/connect
switch (params.Protocol) {
case NYql::NConnector::NApi::EProtocol::NATIVE:
return TEndpoint(fixedHost, MYSQL_PORT);
default:
ythrow yexception() << "Unexpected protocol for MySQL: " << NYql::NConnector::NApi::EProtocol_Name(params.Protocol);
}
default:
ythrow yexception() << "Unexpected database type: " << ToString(params.DatabaseType);
};
Expand Down