Skip to content

Commit d4c45f7

Browse files
authored
KIKIMR-20539: pure pg_catalog selects (#950)
1 parent 6cf49bf commit d4c45f7

File tree

15 files changed

+196
-24
lines changed

15 files changed

+196
-24
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2121
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
2222
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
23+
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider_impl.h>
2324
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
2425
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
2526

@@ -1501,6 +1502,14 @@ class TKqpHost : public IKqpHost {
15011502
TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
15021503
}
15031504

1505+
void InitPgProvider() {
1506+
auto state = MakeIntrusive<NYql::TPgState>();
1507+
state->Types = TypesCtx.Get();
1508+
1509+
TypesCtx->AddDataSource(NYql::PgProviderName, NYql::CreatePgDataSource(state));
1510+
TypesCtx->AddDataSink(NYql::PgProviderName, NYql::CreatePgDataSink(state));
1511+
}
1512+
15041513
void Init(EKikimrQueryType queryType) {
15051514
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
15061515

@@ -1535,6 +1544,8 @@ class TKqpHost : public IKqpHost {
15351544
InitGenericProvider();
15361545
}
15371546

1547+
InitPgProvider();
1548+
15381549
TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry);
15391550
TypesCtx->TimeProvider = TAppData::TimeProvider;
15401551
TypesCtx->RandomProvider = TAppData::RandomProvider;

ydb/core/kqp/host/kqp_translate.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
9393
settings.V0ForceDisable = false;
9494
settings.WarnOnV0 = false;
9595
settings.DefaultCluster = cluster;
96-
settings.ClusterMapping = {{cluster, TString(NYql::KikimrProviderName)}};
96+
settings.ClusterMapping = {
97+
{cluster, TString(NYql::KikimrProviderName)},
98+
{"pg_catalog", TString(NYql::PgProviderName)}
99+
};
97100
auto tablePathPrefix = kqpTablePathPrefix;
98101
if (!tablePathPrefix.empty()) {
99102
settings.PathPrefix = tablePathPrefix;

ydb/core/kqp/host/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/library/yql/providers/generic/provider
3030
ydb/library/yql/providers/result/provider
3131
ydb/library/yql/providers/s3/provider
32+
ydb/library/yql/providers/pg/provider
3233
)
3334

3435
YQL_LAST_ABI_VERSION()

ydb/core/kqp/provider/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ PEERDIR(
4747
ydb/library/yql/providers/common/provider
4848
ydb/library/yql/providers/common/schema/expr
4949
ydb/library/yql/providers/dq/expr_nodes
50+
ydb/library/yql/providers/pg/expr_nodes
5051
ydb/library/yql/providers/result/expr_nodes
5152
ydb/library/yql/providers/result/provider
5253
ydb/library/yql/sql

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ namespace {
5959
.Done();
6060

6161
astNode.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>());
62-
62+
astNode.Ptr()->SetState(TExprNode::EState::ConstrComplete);
6363
exec.Ptr()->ChildRef(TKiExecDataQuery::idx_Ast) = astNode.Ptr();
6464
}
6565

@@ -605,6 +605,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
605605

606606
if (input->Content() == "Result") {
607607
auto result = TMaybeNode<TResult>(input).Cast();
608+
609+
if (auto maybeNth = result.Input().Maybe<TCoNth>()) {
610+
if (auto maybeExecQuery = maybeNth.Tuple().Maybe<TCoRight>().Input().Maybe<TKiExecDataQuery>()) {
611+
input->SetState(TExprNode::EState::ExecutionComplete);
612+
input->SetResult(ctx.NewWorld(input->Pos()));
613+
return SyncOk();
614+
}
615+
}
616+
608617
NKikimrMiniKQL::TType resultType;
609618
TString program;
610619
TStatus status = GetLambdaBody(result.Input().Ptr(), resultType, ctx, program);

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/core/yql_opt_utils.h>
77
#include <ydb/library/yql/utils/log/log.h>
88
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
9+
#include <ydb/library/yql/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
910
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
1011
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1112

@@ -307,6 +308,16 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
307308
return false;
308309
}
309310

311+
bool IsPgRead(const TExprBase& node, TTypeAnnotationContext& types) {
312+
if (auto maybePgRead = node.Maybe<TPgTableContent>()) {
313+
auto dataSourceProviderIt = types.DataSourceMap.find(NYql::PgProviderName);
314+
if (dataSourceProviderIt != types.DataSourceMap.end()) {
315+
return true;
316+
}
317+
}
318+
return false;
319+
}
320+
310321
bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
311322
if (node.Ref().ChildrenSize() <= 1) {
312323
return false;
@@ -383,6 +394,12 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
383394
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
384395
}
385396

397+
if (IsPgRead(node, types)) {
398+
txRes.Ops.insert(node.Raw());
399+
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
400+
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
401+
}
402+
386403
if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
387404
auto write = maybeWrite.Cast();
388405
if (!checkDataSink(write.DataSink())) {
@@ -930,11 +947,21 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
930947
return ret;
931948
}
932949

933-
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
934-
if (!node.Maybe<TResFill>()) {
950+
TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprContext& ctx) {
951+
if (auto maybeCommit = node.Maybe<TCoCommit>()) {
952+
auto world = maybeCommit.Cast().World();
953+
if (!world.Maybe<TResFill>()) {
954+
return node.Ptr();
955+
} else {
956+
node = world;
957+
}
958+
} else {
935959
return node.Ptr();
936960
}
937961

962+
TKiExecDataQuerySettings execSettings;
963+
execSettings.Mode = KikimrCommitModeFlush(); /*because it is a pure query*/
964+
938965
auto resFill = node.Cast<TResFill>();
939966

940967
if (resFill.DelegatedSource().Value() != KikimrProviderName) {
@@ -957,6 +984,8 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
957984
.Build()
958985
.Operations()
959986
.Build()
987+
.Settings()
988+
.Build()
960989
.Done();
961990

962991
auto exec = Build<TKiExecDataQuery>(ctx, node.Pos())
@@ -968,8 +997,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
968997
.QueryBlocks()
969998
.Add({queryBlock})
970999
.Build()
971-
.Settings()
972-
.Build()
1000+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
9731001
.Ast<TCoVoid>().Build()
9741002
.Done();
9751003

@@ -984,7 +1012,21 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
9841012
.Input(exec)
9851013
.Done();
9861014

987-
return ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1015+
auto newResFill = ctx.ChangeChild(*ctx.ChangeChild(resFill.Ref(), 0, world.Ptr()), 3, data.Ptr());
1016+
auto resCommit = Build<TCoCommit>(ctx, node.Pos())
1017+
.World(newResFill)
1018+
.DataSink<TResultDataSink>()
1019+
.Build()
1020+
.Done();
1021+
1022+
return Build<TCoCommit>(ctx, node.Pos())
1023+
.World(resCommit)
1024+
.DataSink<TKiDataSink>()
1025+
.Category().Build(KikimrProviderName)
1026+
.Cluster().Build(cluster)
1027+
.Build()
1028+
.Settings(execSettings.BuildNode(ctx, node.Pos()))
1029+
.Done().Ptr();
9881030
}
9891031

9901032
TYdbOperation GetTableOp(const TKiWriteTable& write) {

ydb/core/kqp/provider/yql_kikimr_results.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/dynumber/dynumber.h>
55
#include <ydb/library/uuid/uuid.h>
66

7+
#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
78
#include <ydb/library/yql/providers/common/codec/yql_codec_results.h>
89
#include <ydb/library/yql/public/decimal/yql_decimal.h>
910

@@ -115,6 +116,24 @@ void WriteValueToYson(const TStringStream& stream, NCommon::TYsonResultWriter& w
115116
return;
116117
}
117118

119+
case NKikimrMiniKQL::ETypeKind::Pg:
120+
{
121+
if (value.GetValueValueCase() == NKikimrMiniKQL::TValue::kNullFlagValue) {
122+
writer.OnEntity();
123+
} else if (value.HasBytes()) {
124+
auto convert = NKikimr::NPg::PgNativeTextFromNativeBinary(
125+
value.GetBytes(), NKikimr::NPg::TypeDescFromPgTypeId(type.GetPg().Getoid())
126+
);
127+
YQL_ENSURE(!convert.Error, "Failed to convert pg value to text: " << *convert.Error);
128+
writer.OnStringScalar(convert.Str);
129+
} else if (value.HasText()) {
130+
writer.OnStringScalar(value.GetText());
131+
} else {
132+
YQL_ENSURE(false, "malformed pg value");
133+
}
134+
return;
135+
}
136+
118137
case NKikimrMiniKQL::ETypeKind::Optional:
119138
if (!value.HasOptional()) {
120139
writer.OnEntity();

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
513513
SetupLogLevelFromTestParam(NKikimrServices::KQP_NODE);
514514
SetupLogLevelFromTestParam(NKikimrServices::KQP_BLOBS_STORAGE);
515515
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
516+
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);
516517

517518
RunCall([this, domain = settings.DomainRoot]{
518519
this->Client->InitRootScheme(domain);

ydb/core/kqp/ut/pg/pg_catalog_ut.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <library/cpp/testing/unittest/registar.h>
3+
4+
namespace NKikimr {
5+
namespace NKqp {
6+
7+
using namespace NYdb;
8+
using namespace NYdb::NTable;
9+
10+
Y_UNIT_TEST_SUITE(PgCatalog) {
11+
Y_UNIT_TEST(PgType) {
12+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
13+
auto db = kikimr.GetQueryClient();
14+
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
15+
{
16+
auto result = db.ExecuteQuery(R"(
17+
select typname from pg_catalog.pg_type order by oid
18+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
19+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
20+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
21+
CompareYson(R"([
22+
["bool"];["bytea"];["char"];["name"];["int8"];["int2"];["int2vector"];["int4"];
23+
["regproc"];["text"];["oid"];["tid"];["xid"];["cid"];["oidvector"];["pg_ddl_command"];
24+
["pg_type"];["pg_attribute"];["pg_proc"];["pg_class"];["json"];["xml"];["pg_node_tree"];
25+
["table_am_handler"];["index_am_handler"];["point"];["lseg"];["path"];["box"];["polygon"];
26+
["line"];["cidr"];["float4"];["float8"];["unknown"];["circle"];["macaddr8"];["money"];
27+
["macaddr"];["inet"];["aclitem"];["bpchar"];["varchar"];["date"];["time"];["timestamp"];
28+
["timestamptz"];["interval"];["timetz"];["bit"];["varbit"];["numeric"];["refcursor"];
29+
["regprocedure"];["regoper"];["regoperator"];["regclass"];["regtype"];["record"];["cstring"];
30+
["any"];["anyarray"];["void"];["trigger"];["language_handler"];["internal"];["anyelement"];
31+
["_record"];["anynonarray"];["uuid"];["txid_snapshot"];["fdw_handler"];["pg_lsn"];["tsm_handler"];
32+
["pg_ndistinct"];["pg_dependencies"];["anyenum"];["tsvector"];["tsquery"];["gtsvector"];
33+
["regconfig"];["regdictionary"];["jsonb"];["anyrange"];["event_trigger"];["int4range"];["numrange"];
34+
["tsrange"];["tstzrange"];["daterange"];["int8range"];["jsonpath"];["regnamespace"];["regrole"];
35+
["regcollation"];["int4multirange"];["nummultirange"];["tsmultirange"];["tstzmultirange"];
36+
["datemultirange"];["int8multirange"];["anymultirange"];["anycompatiblemultirange"];
37+
["pg_brin_bloom_summary"];["pg_brin_minmax_multi_summary"];["pg_mcv_list"];["pg_snapshot"];["xid8"];
38+
["anycompatible"];["anycompatiblearray"];["anycompatiblenonarray"];["anycompatiblerange"]
39+
])", FormatResultSetYson(result.GetResultSet(0)));
40+
}
41+
}
42+
}
43+
44+
} // namespace NKqp
45+
} // namespace NKikimr

ydb/core/kqp/ut/pg/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
66

77
SRCS(
88
kqp_pg_ut.cpp
9+
pg_catalog_ut.cpp
910
)
1011

1112
PEERDIR(

0 commit comments

Comments
 (0)