Skip to content

Commit be594ff

Browse files
committed
YQ-2824: add basis for type inference in KQP metadata loader
1 parent 2f99538 commit be594ff

File tree

10 files changed

+429
-19
lines changed

10 files changed

+429
-19
lines changed

ydb/core/external_sources/external_data_source.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
5252
ValidateHostname(HostnamePatterns, proto.GetLocation());
5353
}
5454

55+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
56+
return NThreading::MakeFuture(std::move(meta));
57+
}
58+
59+
virtual bool CanLoadDynamicMetadata() const override {
60+
return false;
61+
}
62+
5563
private:
5664
const TString Name;
5765
const TVector<TString> AuthMethods;

ydb/core/external_sources/external_source.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,83 @@
11
#pragma once
22

3+
#include <library/cpp/threading/future/core/future.h>
34
#include <util/generic/map.h>
45
#include <util/generic/string.h>
56

67
#include <ydb/core/protos/external_sources.pb.h>
8+
#include <ydb/library/actors/core/actorsystem.h>
79
#include <ydb/library/yql/public/issue/yql_issue.h>
810

911
namespace NKikimr::NExternalSource {
1012

1113
struct TExternalSourceException: public yexception {
1214
};
1315

16+
namespace NAuth {
17+
18+
struct TNone {
19+
static constexpr std::string_view Method = "NONE";
20+
};
21+
22+
struct TAws {
23+
static constexpr std::string_view Method = "AWS";
24+
25+
TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
26+
: AccessKey{accessKey}
27+
, SecretAccessKey{secretAccessKey}
28+
, Region{region}
29+
{}
30+
31+
TString AccessKey;
32+
TString SecretAccessKey;
33+
TString Region;
34+
};
35+
36+
struct TServiceAccount {
37+
static constexpr std::string_view Method = "SERVICE_ACCOUNT";
38+
39+
TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
40+
: ServiceAccountId{std::move(serviceAccountId)}
41+
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
42+
{}
43+
44+
TString ServiceAccountId;
45+
TString ServiceAccountIdSignature;
46+
};
47+
48+
using TAuth = std::variant<TNone, TServiceAccount, TAws>;
49+
50+
std::string_view GetMethod(const TAuth& auth);
51+
52+
inline TAuth MakeNone() {
53+
return TAuth{std::in_place_type_t<TNone>{}};
54+
}
55+
56+
inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
57+
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
58+
}
59+
60+
inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
61+
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
62+
}
63+
}
64+
65+
using TAuth = NAuth::TAuth;
66+
67+
struct TMetadata {
68+
bool Changed = false;
69+
TString TableLocation;
70+
TString DataSourceLocation;
71+
TString DataSourcePath;
72+
TString Type;
73+
74+
THashMap<TString, TString> Attributes;
75+
76+
TAuth Auth;
77+
78+
NKikimrExternalSources::TSchema Schema;
79+
};
80+
1481
struct IExternalSource : public TThrRefBase {
1582
using TPtr = TIntrusivePtr<IExternalSource>;
1683

@@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
54121
If an error occurs, an exception is thrown.
55122
*/
56123
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;
124+
125+
/*
126+
Retrieve additional metadata from runtime data, enrich provided metadata
127+
*/
128+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;
129+
130+
/*
131+
A method that should tell whether there is an implementation
132+
of the previous method.
133+
*/
134+
virtual bool CanLoadDynamicMetadata() const = 0;
57135
};
58136

59137
}

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory {
3232

3333
}
3434

35-
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
35+
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
3636
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
3737
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
3838
{
3939
ToString(NYql::EDatabaseType::ObjectStorage),
40-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
40+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit)
4141
},
4242
{
4343
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase {
1010
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
1111
};
1212

13-
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
13+
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000);
1414

1515
}

ydb/core/external_sources/object_storage.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
#include "external_source.h"
22
#include "object_storage.h"
33
#include "validation_functions.h"
4+
#include "object_storage/s3_fetcher.h"
45

6+
#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
7+
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
8+
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
59
#include <ydb/core/protos/external_sources.pb.h>
610
#include <ydb/core/protos/flat_scheme_op.pb.h>
11+
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
712
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
13+
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
814
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
915
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
1016
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
@@ -20,9 +26,10 @@ namespace NKikimr::NExternalSource {
2026
namespace {
2127

2228
struct TObjectStorageExternalSource : public IExternalSource {
23-
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
29+
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit)
2430
: HostnamePatterns(hostnamePatterns)
2531
, PathsLimit(pathsLimit)
32+
, ActorSystem(actorSystem)
2633
{}
2734

2835
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -255,6 +262,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
255262
return issues;
256263
}
257264

265+
struct TMetadataResult : NYql::NCommon::TOperationResult {
266+
std::shared_ptr<TMetadata> Metadata;
267+
};
268+
269+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
270+
Y_UNUSED(ActorSystem);
271+
// TODO: implement
272+
return NThreading::MakeFuture(std::move(meta));
273+
}
274+
275+
virtual bool CanLoadDynamicMetadata() const override {
276+
return false;
277+
}
278+
258279
private:
259280
static bool IsValidIntervalUnit(const TString& unit) {
260281
static constexpr std::array<std::string_view, 7> IntervalUnits = {
@@ -474,12 +495,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
474495
private:
475496
const std::vector<TRegExMatch> HostnamePatterns;
476497
const size_t PathsLimit;
498+
NActors::TActorSystem* ActorSystem = nullptr;
477499
};
478500

479501
}
480502

481-
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
482-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);
503+
504+
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
505+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit);
483506
}
484507

485508
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {

ydb/core/external_sources/object_storage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace NKikimr::NExternalSource {
1010

11-
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
11+
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit);
1212

1313
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
1414

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
#include "actors/kqp_ic_gateway_actors.h"
33

44
#include <ydb/core/base/path.h>
5+
#include <ydb/core/external_sources/external_source_factory.h>
56
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
67
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
78
#include <ydb/core/statistics/events.h>
89
#include <ydb/core/statistics/stat_service.h>
910

1011
#include <ydb/library/actors/core/hfunc.h>
1112
#include <ydb/library/actors/core/log.h>
13+
#include <ydb/library/yql/utils/signals/utils.h>
1214

1315

1416
namespace NKikimr::NKqp {
@@ -63,7 +65,7 @@ NavigateEntryResult CreateNavigateEntry(const std::pair<TIndexId, TString>& pair
6365
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpList;
6466
entry.SyncVersion = true;
6567
entry.ShowPrivatePath = settings.WithPrivateTables_;
66-
return {entry, pair.second, std::nullopt};
68+
return {std::move(entry), pair.second, std::nullopt};
6769
}
6870

6971
std::optional<NavigateEntryResult> CreateNavigateExternalEntry(const TString& path, bool externalDataSource) {
@@ -279,6 +281,7 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
279281
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
280282
tableMeta->ExternalSource.Properties = description.GetProperties();
281283
tableMeta->ExternalSource.DataSourcePath = tableName;
284+
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
282285
return result;
283286
}
284287

@@ -470,6 +473,61 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSo
470473
return DescribeExternalDataSourceSecrets(authDescription, userToken ? userToken->GetUserSID() : "", actorSystem, maximalSecretsSnapshotWaitTime);
471474
}
472475

476+
NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) {
477+
switch (metadata.DataSourceAuth.identity_case()) {
478+
case NKikimrSchemeOp::TAuth::kNone:
479+
return NExternalSource::NAuth::MakeNone();
480+
case NKikimrSchemeOp::TAuth::kServiceAccount:
481+
return NExternalSource::NAuth::MakeServiceAccount(metadata.DataSourceAuth.GetServiceAccount().GetId(), metadata.ServiceAccountIdSignature);
482+
case NKikimrSchemeOp::TAuth::kAws:
483+
return NExternalSource::NAuth::MakeAws(metadata.AwsAccessKeyId, metadata.AwsSecretAccessKey, metadata.DataSourceAuth.GetAws().GetAwsRegion());
484+
case NKikimrSchemeOp::TAuth::kBasic:
485+
case NKikimrSchemeOp::TAuth::kMdbBasic:
486+
case NKikimrSchemeOp::TAuth::kToken:
487+
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
488+
Y_ABORT("Unimplemented external source auth: %d", metadata.DataSourceAuth.identity_case());
489+
break;
490+
}
491+
Y_UNREACHABLE();
492+
}
493+
494+
std::shared_ptr<NExternalSource::TMetadata> ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata) {
495+
auto metadata = std::make_shared<NExternalSource::TMetadata>();
496+
metadata->TableLocation = tableMetadata.ExternalSource.TableLocation;
497+
metadata->DataSourceLocation = tableMetadata.ExternalSource.DataSourceLocation;
498+
metadata->DataSourcePath = tableMetadata.ExternalSource.DataSourcePath;
499+
metadata->Attributes = tableMetadata.Attributes;
500+
metadata->Auth = MakeAuth(tableMetadata.ExternalSource);
501+
return metadata;
502+
}
503+
504+
// dynamic metadata from IExternalSource here is propagated into TKikimrTableMetadata, which will be returned as a result of LoadTableMetadata()
505+
bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata) {
506+
ui32 id = 0;
507+
for (const auto& column : dynamicMetadata.Schema.column()) {
508+
Ydb::Type::PrimitiveTypeId typeId {};
509+
if (column.type().has_type_id()) {
510+
typeId = column.type().type_id();
511+
} else if (column.type().has_optional_type()) {
512+
typeId = column.type().optional_type().item().type_id();
513+
} else {
514+
Y_ABORT_UNLESS(false);
515+
}
516+
const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId, nullptr);
517+
auto typeName = GetTypeName(typeInfoMod);
518+
519+
tableMetadata.Columns.emplace(
520+
column.name(),
521+
NYql::TKikimrColumnMetadata(
522+
column.name(), id, typeName, !column.type().has_optional_type(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod
523+
)
524+
);
525+
++id;
526+
}
527+
tableMetadata.Attributes = dynamicMetadata.Attributes;
528+
return true;
529+
}
530+
473531
} // anonymous namespace
474532

475533

@@ -680,9 +738,11 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
680738
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
681739
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
682740
const bool resolveEntityInsideDataSource = (cluster != Cluster);
741+
TMaybe<TString> externalPath;
683742
TPath entityName = id;
684743
if constexpr (std::is_same_v<TPath, TString>) {
685744
if (resolveEntityInsideDataSource) {
745+
externalPath = entityName;
686746
entityName = cluster;
687747
}
688748
} else {
@@ -720,7 +780,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
720780
ActorSystem,
721781
schemeCacheId,
722782
ev.Release(),
723-
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName]
783+
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName, externalPath]
724784
(TPromise<TResult> promise, TResponse&& response) mutable
725785
{
726786
try {
@@ -759,16 +819,41 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
759819

760820
switch (entry.Kind) {
761821
case EKind::KindExternalDataSource: {
822+
if (externalPath) {
823+
entry.Path = SplitPath(*externalPath);
824+
}
762825
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
763826
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
764827
promise.SetValue(externalDataSourceMetadata);
765828
return;
766829
}
767830
LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem)
768-
.Subscribe([promise, externalDataSourceMetadata](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
831+
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
769832
{
770833
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
771-
promise.SetValue(externalDataSourceMetadata);
834+
NExternalSource::IExternalSource::TPtr externalSource;
835+
if (settings.ExternalSourceFactory) {
836+
externalSource = settings.ExternalSourceFactory->GetOrCreate(externalDataSourceMetadata.Metadata->ExternalSource.Type);
837+
}
838+
839+
if (externalSource && externalSource->CanLoadDynamicMetadata()) {
840+
auto externalSourceMeta = ConvertToExternalSourceMetadata(*externalDataSourceMetadata.Metadata);
841+
externalSourceMeta->Attributes = settings.ReadAttributes; // attributes, collected from AST
842+
externalSource->LoadDynamicMetadata(std::move(externalSourceMeta))
843+
.Subscribe([promise = std::move(promise), externalDataSourceMetadata](const TFuture<std::shared_ptr<NExternalSource::TMetadata>>& result) mutable {
844+
TTableMetadataResult wrapper;
845+
if (result.HasValue() && (!result.GetValue()->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *result.GetValue()))) {
846+
wrapper.SetSuccess();
847+
wrapper.Metadata = externalDataSourceMetadata.Metadata;
848+
} else {
849+
// TODO: forward exception from result
850+
wrapper.SetException(yexception() << "LoadDynamicMetadata failed");
851+
}
852+
promise.SetValue(wrapper);
853+
});
854+
} else {
855+
promise.SetValue(externalDataSourceMetadata);
856+
}
772857
});
773858
break;
774859
}
@@ -785,7 +870,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
785870
.Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable
786871
{
787872
auto externalDataSourceMetadata = result.GetValue();
788-
promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata));
873+
auto newMetadata = EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata);
874+
promise.SetValue(std::move(newMetadata));
789875
});
790876
break;
791877
}

0 commit comments

Comments
 (0)