Skip to content

YQ-4084 Add available external data sources #15192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@ namespace NKikimr::NExternalSource {
namespace {

struct TExternalSourceFactory : public IExternalSourceFactory {
TExternalSourceFactory(const TMap<TString, IExternalSource::TPtr>& sources)
TExternalSourceFactory(
const TMap<TString, IExternalSource::TPtr>& sources,
const std::set<TString>& availableExternalDataSources)
: Sources(sources)
, AvailableExternalDataSources(availableExternalDataSources)
{}

IExternalSource::TPtr GetOrCreate(const TString& type) const override {
auto it = Sources.find(type);
if (it != Sources.end()) {
return it->second;
if (it == Sources.end()) {
throw TExternalSourceException() << "External source with type " << type << " was not found";
}
throw TExternalSourceException() << "External source with type " << type << " was not found";
if (!AvailableExternalDataSources.contains(type)) {
throw TExternalSourceException() << "External source with type " << type << " is disabled. Please contact your system administrator to enable it";
}
return it->second;
}

private:
TMap<TString, IExternalSource::TPtr> Sources;
const TMap<TString, IExternalSource::TPtr> Sources;
const std::set<TString> AvailableExternalDataSources;
};

}
Expand All @@ -37,7 +44,8 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer,
bool allowLocalFiles) {
bool allowLocalFiles,
const std::set<TString>& availableExternalDataSources) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
Expand Down Expand Up @@ -84,7 +92,8 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
ToString(NYql::EDatabaseType::Solomon),
CreateExternalDataSource(TString{NYql::SolomonProviderName}, {"NONE", "TOKEN"}, {}, hostnamePatternsRegEx)
}
});
},
availableExternalDataSources);
}

}
4 changes: 3 additions & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "external_source.h"
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>

namespace NKikimr::NExternalSource {

Expand All @@ -16,6 +17,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
size_t pathsLimit = 50000,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
bool enableInfer = false,
bool allowLocalFiles = false);
bool allowLocalFiles = false,
const std::set<TString>& availableExternalDataSources = NYql::GetAllExternalDataSourceTypes());

}
13 changes: 13 additions & 0 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "kqp_federated_query_helpers.h"

#include <ydb/library/actors/http/http_proxy.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>

#include <ydb/core/base/counters.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/protos/auth.pb.h>
Expand Down Expand Up @@ -56,6 +59,11 @@ namespace NKikimr::NKqp {
return std::make_pair(ToString(host), scheme == "grpcs");
}

bool IsValidExternalDataSourceType(const TString& type) {
static auto allTypes = NYql::GetAllExternalDataSourceTypes();
return allTypes.contains(type);
}

// TKqpFederatedQuerySetupFactoryDefault contains network clients and service actors necessary
// for federated queries. HTTP Gateway (required by S3 provider) is run by default even without
// explicit configuration. Token Accessor and Connector Client are run only if config is provided.
Expand Down Expand Up @@ -161,6 +169,11 @@ namespace NKikimr::NKqp {
return std::make_shared<TKqpFederatedQuerySetupFactoryNoop>();
}

for (const auto& source : appConfig.GetQueryServiceConfig().GetAvailableExternalDataSources()) {
if (!IsValidExternalDataSourceType(source)) {
ythrow yexception() << "wrong AvailableExternalDataSources \"" << source << "\"";
}
}
return std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryDefault>(setup, appData, appConfig);
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,15 @@ class TKqpHost : public IKqpHost {
}

if (FederatedQuerySetup) {
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({},
const auto& hostnamePatterns = QueryServiceConfig.GetHostnamePatterns();
const auto& availableExternalDataSources = QueryServiceConfig.GetAvailableExternalDataSources();
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
ActorSystem,
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles(),
std::set<TString>(availableExternalDataSources.cbegin(), availableExternalDataSources.cend()));
}
}

Expand Down
48 changes: 36 additions & 12 deletions ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,37 +296,47 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestCreateExternalTable) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
}

Y_UNIT_TEST(TestCreateSameExternalTable) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table", true);
}

Y_UNIT_TEST(TestDropExternalTable) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestCreateExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
TestDropExternalTable(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_table");
}

Y_UNIT_TEST(TestDropExternalDataSource) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestDropExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
}

Y_UNIT_TEST(TestLoadExternalTable) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -366,7 +376,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadServiceAccountSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -404,7 +416,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadBasicSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -435,7 +449,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadMdbBasicSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -473,7 +489,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadAwsSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -510,7 +528,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadDataSourceProperties) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -558,7 +578,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestLoadTokenSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("YT");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -588,7 +610,9 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
}

Y_UNIT_TEST(TestSecretsExistingValidation) {
TKikimrRunner kikimr;
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetAppConfig(appCfg));
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace NKqp {
}
if (!kikimrSettings.FeatureFlags.HasEnableExternalDataSources()) {
kikimrSettings.SetEnableExternalDataSources(true);
kikimrSettings.AppConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
if (!appConfig) {
appConfig.emplace();
}
appConfig->MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
appConfig->MutableQueryServiceConfig()->AddAvailableExternalDataSources("ClickHouse");
appConfig->MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL");
appConfig->MutableQueryServiceConfig()->AddAvailableExternalDataSources("MySQL");
appConfig->MutableQueryServiceConfig()->AddAvailableExternalDataSources("Ydb");

auto settings = TKikimrSettings();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ namespace NKikimr::NKqp {
appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableConnector()->MutableEndpoint()->set_host("localhost");
appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableConnector()->MutableEndpoint()->set_port(1234);
appConfig.MutableQueryServiceConfig()->MutableGeneric()->MutableDefaultSettings()->Add(std::move(dateTimeFormat));
appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ObjectStorage");
appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("ClickHouse");
appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("PostgreSQL");
appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("MySQL");
appConfig.MutableQueryServiceConfig()->AddAvailableExternalDataSources("Ydb");
return appConfig;
}

Expand Down
Loading
Loading