Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,16 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
{
ToString(NYql::EDatabaseType::MsSQLServer),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
}});
},
{
ToString(NYql::EDatabaseType::Oracle),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "service_name"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::Logging),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"SERVICE_ACCOUNT"}, {"folder_id"}, hostnamePatternsRegEx)
}
});
}

}
10 changes: 10 additions & 0 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ void AddClustersFromConnections(
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kLogging: {
const auto& connection = conn.content().setting().logging();
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::LOGGING);
clusterCfg->SetName(connectionName);
clusterCfg->mutable_datasourceoptions()->insert({"folder_id", connection.folder_id()});
FillClusterAuth(*clusterCfg, connection.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}

// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/cloud_audit/yq_cloud_audit_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ std::string MapConnectionType(const FederatedQuery::ConnectionSetting::Connectio
return "Monitoring";
case FederatedQuery::ConnectionSetting::ConnectionCase::kPostgresqlCluster:
return "PostgreSQLCluster";
default:
case FederatedQuery::ConnectionSetting::ConnectionCase::kGreenplumCluster:
return "GreenplumCluster";
case FederatedQuery::ConnectionSetting::ConnectionCase::kMysqlCluster:
return "MySQLCluster";
case FederatedQuery::ConnectionSetting::ConnectionCase::kLogging:
return "Logging";
case FederatedQuery::ConnectionSetting::ConnectionCase::CONNECTION_NOT_SET:
Y_ENSURE(false, "Invalid connection case " << i32(connectionCase));
}
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/fq/libs/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kLogging: {
return GetServiceAccountId(setting.logging().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down Expand Up @@ -173,6 +176,8 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
return setting.greenplum_cluster().login();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().login();
case FederatedQuery::ConnectionSetting::kLogging:
return {};
}
}

Expand All @@ -196,6 +201,8 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
return setting.greenplum_cluster().password();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().password();
case FederatedQuery::ConnectionSetting::kLogging:
return {};
}
}

Expand All @@ -219,6 +226,8 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return GetBasicAuthMethod(setting.mysql_cluster().auth());
case FederatedQuery::ConnectionSetting::kLogging:
return GetIamAuthMethod(setting.logging().auth());
}
}

Expand All @@ -240,6 +249,8 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
return connection.content().setting().greenplum_cluster().auth();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return connection.content().setting().mysql_cluster().auth();
case FederatedQuery::ConnectionSetting::kLogging:
return connection.content().setting().logging().auth();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth{};
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class TComputeConfig {
}
}

// This function shows which external data sources are currently supported by the open-source YDB
// and which ones are not yet supported.
bool IsConnectionCaseEnabled(
const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const {
switch (connectionCase) {
Expand All @@ -167,6 +169,7 @@ class TComputeConfig {
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
case FederatedQuery::ConnectionSetting::kMysqlCluster:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
case FederatedQuery::ConnectionSetting::kLogging:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
case FederatedQuery::ConnectionSetting::kMonitoring:
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ TString MakeCreateExternalDataSourceQuery(
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});

}
break;
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
Expand All @@ -260,7 +259,15 @@ TString MakeCreateExternalDataSourceQuery(
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");

}
case FederatedQuery::ConnectionSetting::kLogging: {
properties = fmt::format(
R"(
SOURCE_TYPE="Logging",
FOLDER_ID={folder_id}
)",
"folder_id"_a = EncloseAndEscapeString(connectionContent.setting().logging().folder_id(), '"'));
break;
}
break;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/utils/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kLogging: {
return GetServiceAccountId(setting.logging().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down
47 changes: 20 additions & 27 deletions ydb/core/fq/libs/control_plane_storage/request_validators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ void ValidateGenericConnectionSetting(
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!connection.database_id() && !(connection.host() && connection.port())) {
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.{database_id or host,port} field is not specified";
if (!connection.database_id()) {
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.database_id field is not specified";
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
}

Expand Down Expand Up @@ -75,34 +75,11 @@ NYql::TIssues ValidateConnectionSetting(
break;
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
const FederatedQuery::GreenplumCluster& greenplumCluster = setting.greenplum_cluster();

if (!greenplumCluster.has_auth() || greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.auth field is not specified"));
}

if (greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!greenplumCluster.database_id() && !greenplumCluster.database_name()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.{database_id or database_name} field is not specified"));
}
ValidateGenericConnectionSetting(setting.greenplum_cluster(), "greenplum", disableCurrentIam, passwordRequired, issues);
break;
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
const FederatedQuery::MySQLCluster database = setting.mysql_cluster();
if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.auth field is not specified"));
}

if (database.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!database.database_id() && !database.database_name()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.{database_id or database_name} field is not specified"));
}
ValidateGenericConnectionSetting(setting.mysql_cluster(), "mysql", disableCurrentIam, passwordRequired, issues);
break;
}
case FederatedQuery::ConnectionSetting::kObjectStorage: {
Expand Down Expand Up @@ -154,6 +131,22 @@ NYql::TIssues ValidateConnectionSetting(
}
break;
}
case FederatedQuery::ConnectionSetting::kLogging: {
const FederatedQuery::Logging logging = setting.logging();
if (!logging.has_auth() || logging.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.logging.auth field is not specified"));
}

if (logging.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
}

if (!logging.folder_id()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.logging.folder_id field is not specified"));
}

break;
}
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection is not set"));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio
return setting.greenplum_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kLogging:
return setting.logging().auth().identity_case();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth::IDENTITY_NOT_SET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
static const TSet<TString> properties {
"database_name",
"protocol", // managed PG, CH
"mdb_cluster_id", // managed PG, CH
"mdb_cluster_id", // managed PG, CH, GP, MY
"database_id", // managed YDB
"use_tls",
"schema", // managed PG
"schema", // managed PG, GP
"service_name", // oracle
"folder_id" // logging
};

for (const auto& property: properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ enum class EDatabaseType {
YT,
MySQL,
Greenplum,
MsSQLServer
MsSQLServer,
Oracle,
Logging
};

inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourceKind dataSourceKind) {
Expand All @@ -34,6 +36,10 @@ inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourc
return EDatabaseType::Greenplum;
case NConnector::NApi::EDataSourceKind::MS_SQL_SERVER:
return EDatabaseType::MsSQLServer;
case NConnector::NApi::EDataSourceKind::ORACLE:
return EDatabaseType::Oracle;
case NConnector::NApi::EDataSourceKind::LOGGING:
return EDatabaseType::Logging;
default:
ythrow yexception() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind);
}
Expand All @@ -53,6 +59,10 @@ inline NConnector::NApi::EDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseT
return NConnector::NApi::EDataSourceKind::GREENPLUM;
case EDatabaseType::MsSQLServer:
return NConnector::NApi::EDataSourceKind::MS_SQL_SERVER;
case EDatabaseType::Oracle:
return NConnector::NApi::EDataSourceKind::ORACLE;
case EDatabaseType::Logging:
return NConnector::NApi::EDataSourceKind::LOGGING;
default:
ythrow yexception() << "Unknown database type: " << ToString(databaseType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ namespace NYql::NDq {
args.MaxKeysInRequest);
};

for (auto& name : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric", "MySqlGeneric", "GreenplumGeneric", "MsSQLServerGeneric"}) {
for (auto& name : {
"ClickHouseGeneric",
"PostgreSqlGeneric",
"YdbGeneric",
"MySqlGeneric",
"GreenplumGeneric",
"MsSQLServerGeneric",
"OracleGeneric",
"LoggingGeneric"}
) {
factory.RegisterSource<Generic::TSource>(name, readActorFactory);
factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ namespace NYql::NDq {

auto tokenProvider = CreateGenericTokenProvider(
source.GetToken(),
source.GetServiceAccountId(), source.GetServiceAccountIdSignature(),
source.GetServiceAccountId(),
source.GetServiceAccountIdSignature(),
credentialsFactory);

const auto actor = new TGenericReadActor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ namespace NYql::NDq {
}

TGenericTokenProvider::TGenericTokenProvider(
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
Y_ENSURE(!serviceAccountId.Empty(), "No service account provided");
Y_ENSURE(!ServiceAccountIdSignature.Empty(), "No service account signature provided");
Y_ENSURE(!serviceAccountId.empty(), "No service account provided");
Y_ENSURE(!serviceAccountIdSignature.empty(), "No service account signature provided");
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON =
TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature).ToJson();
TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, serviceAccountIdSignature).ToJson();

Y_ENSURE(structuredTokenJSON, "empty structured token");

Expand Down Expand Up @@ -58,7 +59,8 @@ namespace NYql::NDq {
}

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const TString& staticIamToken, const TString& serviceAccountId,
CreateGenericTokenProvider(const TString& staticIamToken,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
if (!staticIamToken.Empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace NYql::NDq {
TGenericTokenProvider::TPtr
CreateGenericTokenProvider(
const TString& staticIamToken,
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ enum EDataSourceKind {
MYSQL = 5;
MS_SQL_SERVER = 6;
GREENPLUM = 7;
ORACLE = 8;
LOGGING = 9;
}

// EProtocol generalizes various kinds of network protocols supported by different databases.
Expand Down Expand Up @@ -68,6 +70,19 @@ message TGreenplumDataSourceOptions {
string schema = 1;
}

// TOracleDataSourceOptions represents settings specific to Oracle
message TOracleDataSourceOptions {
// Oracle service_name - alias to SID of oracle INSTANCE, or SID, or PDB.
// More about connection options in Oracle docs:
// https://docs.oracle.com/en/database/other-databases/essbase/21/essoa/connection-string-formats.html
string service_name = 1;
}

// TLoggingDataSourceOptions represents settings specific to Logging
message TLoggingDataSourceOptions {
string folder_id = 1;
}

// TDataSourceInstance helps to identify the instance of a data source to redirect request to.
message TDataSourceInstance {
// Data source kind
Expand All @@ -90,5 +105,7 @@ message TDataSourceInstance {
TClickhouseDataSourceOptions ch_options = 8;
TS3DataSourceOptions s3_options = 9;
TGreenplumDataSourceOptions gp_options = 10;
TOracleDataSourceOptions oracle_options = 11;
TLoggingDataSourceOptions logging_options = 12;
}
}
Loading
Loading