Skip to content

Commit 51dda4b

Browse files
authored
YQ-2824: add basis for type inference in KQP metadata loader (#4273)
1 parent 0af7b18 commit 51dda4b

23 files changed

+785
-50
lines changed

ydb/core/external_sources/external_data_source.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
5252
ValidateHostname(HostnamePatterns, proto.GetLocation());
5353
}
5454

55+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
56+
return NThreading::MakeFuture(std::move(meta));
57+
}
58+
59+
virtual bool CanLoadDynamicMetadata() const override {
60+
return false;
61+
}
62+
5563
private:
5664
const TString Name;
5765
const TVector<TString> AuthMethods;

ydb/core/external_sources/external_source.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,83 @@
11
#pragma once
22

3+
#include <library/cpp/threading/future/core/future.h>
34
#include <util/generic/map.h>
45
#include <util/generic/string.h>
56

67
#include <ydb/core/protos/external_sources.pb.h>
8+
#include <ydb/library/actors/core/actorsystem.h>
79
#include <ydb/library/yql/public/issue/yql_issue.h>
810

911
namespace NKikimr::NExternalSource {
1012

1113
struct TExternalSourceException: public yexception {
1214
};
1315

16+
namespace NAuth {
17+
18+
struct TNone {
19+
static constexpr std::string_view Method = "NONE";
20+
};
21+
22+
struct TAws {
23+
static constexpr std::string_view Method = "AWS";
24+
25+
TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
26+
: AccessKey{accessKey}
27+
, SecretAccessKey{secretAccessKey}
28+
, Region{region}
29+
{}
30+
31+
TString AccessKey;
32+
TString SecretAccessKey;
33+
TString Region;
34+
};
35+
36+
struct TServiceAccount {
37+
static constexpr std::string_view Method = "SERVICE_ACCOUNT";
38+
39+
TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
40+
: ServiceAccountId{std::move(serviceAccountId)}
41+
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
42+
{}
43+
44+
TString ServiceAccountId;
45+
TString ServiceAccountIdSignature;
46+
};
47+
48+
using TAuth = std::variant<TNone, TServiceAccount, TAws>;
49+
50+
std::string_view GetMethod(const TAuth& auth);
51+
52+
inline TAuth MakeNone() {
53+
return TAuth{std::in_place_type_t<TNone>{}};
54+
}
55+
56+
inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
57+
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
58+
}
59+
60+
inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
61+
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
62+
}
63+
}
64+
65+
using TAuth = NAuth::TAuth;
66+
67+
struct TMetadata {
68+
bool Changed = false;
69+
TString TableLocation;
70+
TString DataSourceLocation;
71+
TString DataSourcePath;
72+
TString Type;
73+
74+
THashMap<TString, TString> Attributes;
75+
76+
TAuth Auth;
77+
78+
NKikimrExternalSources::TSchema Schema;
79+
};
80+
1481
struct IExternalSource : public TThrRefBase {
1582
using TPtr = TIntrusivePtr<IExternalSource>;
1683

@@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
54121
If an error occurs, an exception is thrown.
55122
*/
56123
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;
124+
125+
/*
126+
Retrieve additional metadata from runtime data, enrich provided metadata
127+
*/
128+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;
129+
130+
/*
131+
A method that should tell whether there is an implementation
132+
of the previous method.
133+
*/
134+
virtual bool CanLoadDynamicMetadata() const = 0;
57135
};
58136

59137
}

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory {
3232

3333
}
3434

35-
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
35+
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
3636
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
3737
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
3838
{
3939
ToString(NYql::EDatabaseType::ObjectStorage),
40-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
40+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit)
4141
},
4242
{
4343
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase {
1010
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
1111
};
1212

13-
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
13+
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000);
1414

1515
}

ydb/core/external_sources/object_storage.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
#include "object_storage.h"
33
#include "validation_functions.h"
44

5+
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
56
#include <ydb/core/protos/external_sources.pb.h>
67
#include <ydb/core/protos/flat_scheme_op.pb.h>
8+
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
79
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
10+
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
811
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
912
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
1013
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
@@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource {
2023
namespace {
2124

2225
struct TObjectStorageExternalSource : public IExternalSource {
23-
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
26+
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit)
2427
: HostnamePatterns(hostnamePatterns)
2528
, PathsLimit(pathsLimit)
29+
, ActorSystem(actorSystem)
2630
{}
2731

2832
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
255259
return issues;
256260
}
257261

262+
struct TMetadataResult : NYql::NCommon::TOperationResult {
263+
std::shared_ptr<TMetadata> Metadata;
264+
};
265+
266+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
267+
Y_UNUSED(ActorSystem);
268+
// TODO: implement
269+
return NThreading::MakeFuture(std::move(meta));
270+
}
271+
272+
virtual bool CanLoadDynamicMetadata() const override {
273+
return false;
274+
}
275+
258276
private:
259277
static bool IsValidIntervalUnit(const TString& unit) {
260278
static constexpr std::array<std::string_view, 7> IntervalUnits = {
@@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
474492
private:
475493
const std::vector<TRegExMatch> HostnamePatterns;
476494
const size_t PathsLimit;
495+
NActors::TActorSystem* ActorSystem = nullptr;
477496
};
478497

479498
}
480499

481-
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
482-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);
500+
501+
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
502+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit);
483503
}
484504

485505
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {

ydb/core/external_sources/object_storage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace NKikimr::NExternalSource {
1010

11-
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
11+
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit);
1212

1313
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
1414

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ namespace NKikimr {
88

99
Y_UNIT_TEST_SUITE(ObjectStorageTest) {
1010
Y_UNIT_TEST(SuccessValidation) {
11-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
11+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
1212
NKikimrExternalSources::TSchema schema;
1313
NKikimrExternalSources::TGeneral general;
1414
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
1515
}
1616

1717
Y_UNIT_TEST(FailedCreate) {
18-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
18+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
1919
NKikimrExternalSources::TSchema schema;
2020
NKikimrExternalSources::TGeneral general;
2121
general.mutable_attributes()->insert({"a", "b"});
2222
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
2323
}
2424

2525
Y_UNIT_TEST(FailedValidation) {
26-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
26+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
2727
NKikimrExternalSources::TSchema schema;
2828
NKikimrExternalSources::TGeneral general;
2929
general.mutable_attributes()->insert({"projection.h", "b"});

ydb/core/grpc_services/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ PEERDIR(
115115
ydb/core/io_formats/ydb_dump
116116
ydb/core/kesus/tablet
117117
ydb/core/kqp/common
118+
ydb/core/kqp/session_actor
118119
ydb/core/protos
119120
ydb/core/scheme
120121
ydb/core/sys_view

ydb/core/kqp/gateway/behaviour/external_data_source/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ PEERDIR(
1010
ydb/services/metadata/abstract
1111
ydb/services/metadata/secret
1212
ydb/core/kqp/gateway/actors
13+
ydb/core/kqp/federated_query
1314
ydb/core/kqp/gateway/utils
1415
ydb/core/kqp/gateway/behaviour/tablestore/operations
1516
)

0 commit comments

Comments
 (0)