Skip to content

Commit 94b8402

Browse files
authored
Merge cf09e55 into 7ed98e4
2 parents 7ed98e4 + cf09e55 commit 94b8402

File tree

4 files changed

+198
-0
lines changed

4 files changed

+198
-0
lines changed

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/kqp/opt/kqp_opt.h>
66
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
77
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
8+
#include <ydb/core/kqp/opt/kqp_columns_getter_transformer.h>
89
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
910
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
1011

@@ -297,6 +298,7 @@ class TKqpRunner : public IKqpRunner {
297298
.AddPostTypeAnnotation(/* forSubgraph */ true)
298299
.AddCommonOptimization()
299300
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
301+
.Add(CreateKqpColumnsGetterTransformer(Config), "ColumnGetter")
300302
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
301303
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
302304
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
@@ -331,6 +333,7 @@ class TKqpRunner : public IKqpRunner {
331333
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
332334
.AddPostTypeAnnotation()
333335
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
336+
.Add(CreateKqpColumnsGetterTransformer(Config), "ColumnGetter")
334337
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
335338
.Build(false);
336339

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
#include "kqp_columns_getter_transformer.h"
2+
3+
#include <ydb/library/yql/core/yql_expr_optimize.h>
4+
5+
namespace NKikimr::NKqp {
6+
7+
void TKqpColumnsGetterTransformer::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) {
8+
if (input->ChildrenSize() < 2) {
9+
return;
10+
}
11+
12+
auto callableInput = input->ChildRef(0);
13+
14+
15+
for (size_t i = 1; i < input->ChildrenSize(); ++i) {
16+
auto maybeLambda = TExprBase(input->ChildRef(i));
17+
if (!maybeLambda.Maybe<TCoLambda>()) {
18+
continue;
19+
}
20+
21+
auto lambda = maybeLambda.Cast<TCoLambda>();
22+
if (!lambda.Args().Size()){
23+
continue;
24+
}
25+
26+
if (callableInput->IsList()){
27+
for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){
28+
TableByExprNode[lambda.Args().Arg(j).Ptr()] = TableByExprNode[callableInput->Child(j)];
29+
}
30+
} else {
31+
TableByExprNode[lambda.Args().Arg(0).Ptr()] = TableByExprNode[callableInput.Get()];
32+
}
33+
}
34+
}
35+
36+
IGraphTransformer::TStatus TKqpColumnsGetterTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
37+
Y_UNUSED(ctx);
38+
39+
output = input;
40+
41+
// if (Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 0) {
42+
// return IGraphTransformer::TStatus::Ok;
43+
// }
44+
45+
VisitExprLambdasLast(
46+
input,
47+
[&](const TExprNode::TPtr& input) {
48+
BeforeLambdas(input) || BeforeLambdasUnmatched(input);
49+
50+
if (input->IsCallable()) {
51+
PropagateTableToLambdaArgument(input);
52+
}
53+
54+
return true;
55+
},
56+
[&](const TExprNode::TPtr& input) {
57+
return AfterLambdas(input) || AfterLambdasUnmatched(input);
58+
}
59+
);
60+
61+
return IGraphTransformer::TStatus::Ok;
62+
}
63+
64+
bool TKqpColumnsGetterTransformer::BeforeLambdas(const TExprNode::TPtr& input) {
65+
bool matched = true;
66+
67+
if (TKqpTable::Match(input.Get())) {
68+
TableByExprNode[input.Get()] = input.Get();
69+
} else {
70+
matched = false;
71+
}
72+
73+
return matched;
74+
}
75+
76+
bool TKqpColumnsGetterTransformer::BeforeLambdasUnmatched(const TExprNode::TPtr& input) {
77+
for (const auto& node: input->Children()) {
78+
if (TableByExprNode.contains(node)) {
79+
TableByExprNode[input.Get()] = TableByExprNode[node];
80+
return true;
81+
}
82+
}
83+
84+
return true;
85+
}
86+
87+
bool TKqpColumnsGetterTransformer::AfterLambdas(const TExprNode::TPtr& input) {
88+
bool matched = true;
89+
90+
if (
91+
TCoFilterBase::Match(input.Get()) ||
92+
TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref())
93+
) {
94+
VisitExpr(
95+
input->Child(1),
96+
[this](const TExprNode::TPtr& input) -> bool {
97+
if (TCoMember::Match(input.Get())) {
98+
auto member = TExprBase(input).Cast<TCoMember>();
99+
100+
if (!TableByExprNode.contains(input.Get()) || TableByExprNode[input.Get()] == nullptr) {
101+
return true;
102+
}
103+
auto table = TExprBase(TableByExprNode[input.Get()]).Cast<TKqpTable>().Path().StringValue();
104+
auto column = member.Name().StringValue();
105+
size_t pointPos = column.find('.'); // table.column
106+
if (pointPos != TString::npos) {
107+
column = column.substr(pointPos + 1);
108+
}
109+
110+
ColumnsByTableName[table].insert(std::move(column));
111+
}
112+
113+
return true;
114+
}
115+
);
116+
} else {
117+
matched = false;
118+
}
119+
120+
return matched;
121+
}
122+
123+
bool TKqpColumnsGetterTransformer::AfterLambdasUnmatched(const TExprNode::TPtr& input) {
124+
if (TableByExprNode.contains(input.Get())) {
125+
return true;
126+
}
127+
128+
for (const auto& node: input->Children()) {
129+
if (TableByExprNode.contains(node)) {
130+
TableByExprNode[input.Get()] = TableByExprNode[node];
131+
return true;
132+
}
133+
}
134+
135+
return true;
136+
}
137+
138+
TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer(
139+
const TKikimrConfiguration::TPtr& config
140+
) {
141+
return THolder<IGraphTransformer>(new TKqpColumnsGetterTransformer(config));
142+
}
143+
144+
} // end of NKikimr::NKqp
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#pragma once
2+
3+
#include <ydb/core/kqp/common/kqp_yql.h>
4+
#include <ydb/library/yql/core/yql_graph_transformer.h>
5+
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
6+
#include <ydb/library/yql/core/yql_opt_utils.h>
7+
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
8+
9+
namespace NKikimr::NKqp {
10+
11+
using namespace NYql;
12+
using namespace NYql::NNodes;
13+
14+
class TKqpColumnsGetterTransformer : public TSyncTransformerBase {
15+
public:
16+
TKqpColumnsGetterTransformer(
17+
const TKikimrConfiguration::TPtr&
18+
)
19+
// : Config(config)
20+
{}
21+
22+
// Main method of the transformer
23+
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
24+
25+
void Rewind() override {}
26+
27+
~TKqpColumnsGetterTransformer() override = default;
28+
29+
private:
30+
bool BeforeLambdas(const TExprNode::TPtr& input);
31+
32+
bool BeforeLambdasUnmatched(const TExprNode::TPtr& input);
33+
34+
void PropagateTableToLambdaArgument(const TExprNode::TPtr& input);
35+
36+
bool AfterLambdas(const TExprNode::TPtr& input);
37+
38+
bool AfterLambdasUnmatched(const TExprNode::TPtr& input);
39+
40+
private:
41+
THashMap<TExprNode::TPtr, TExprNode::TPtr> TableByExprNode;
42+
THashMap<TString, THashSet<TString>> ColumnsByTableName;
43+
// const TKikimrConfiguration::TPtr& Config;
44+
};
45+
46+
TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer(
47+
const TKikimrConfiguration::TPtr& config
48+
);
49+
50+
} // end of NKikimr::NKqp namespace

ydb/core/kqp/opt/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ SRCS(
1212
kqp_query_blocks_transformer.cpp
1313
kqp_query_plan.cpp
1414
kqp_statistics_transformer.cpp
15+
kqp_columns_getter_transformer.cpp
1516
kqp_constant_folding_transformer.cpp
1617
)
1718

0 commit comments

Comments
 (0)