Skip to content

Commit 37d80e8

Browse files
authored
Merge e37a22e into c66a55e
2 parents c66a55e + e37a22e commit 37d80e8

File tree

9 files changed

+435
-100
lines changed

9 files changed

+435
-100
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1761,7 +1761,7 @@ class TKqpHost : public IKqpHost {
17611761

17621762
void Init(EKikimrQueryType queryType) {
17631763
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
1764-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
1764+
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);
17651765

17661766
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
17671767
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();

ydb/core/kqp/host/kqp_host_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class IKqpRunner : public TThrRefBase {
265265

266266
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
267267
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
268-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
268+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem);
269269

270270
TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
271271
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/kqp/opt/kqp_opt.h>
66
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
77
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
8+
#include <ydb/core/kqp/opt/kqp_columns_getter_transformer.h>
89
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
910
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
1011

@@ -137,7 +138,8 @@ class TKqpRunner : public IKqpRunner {
137138
public:
138139
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
139140
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
140-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
141+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry,
142+
TActorSystem* actorSystem)
141143
: Gateway(gateway)
142144
, Cluster(cluster)
143145
, TypesCtx(*typesCtx)
@@ -149,6 +151,7 @@ class TKqpRunner : public IKqpRunner {
149151
sessionCtx->TablesPtr()))
150152
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
151153
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel)))
154+
, ActorSystem(actorSystem)
152155
{
153156
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
154157
}
@@ -297,6 +300,7 @@ class TKqpRunner : public IKqpRunner {
297300
.AddPostTypeAnnotation(/* forSubgraph */ true)
298301
.AddCommonOptimization()
299302
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
303+
.Add(CreateKqpColumnsGetterTransformer(Config, SessionCtx->Tables(), Cluster, ActorSystem), "ColumnGetter")
300304
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
301305
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
302306
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
@@ -324,7 +328,7 @@ class TKqpRunner : public IKqpRunner {
324328
Config),
325329
"BuildPhysicalTxs")
326330
.Build(false));
327-
331+
328332
auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
329333
.AddServiceTransformers()
330334
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
@@ -399,15 +403,17 @@ class TKqpRunner : public IKqpRunner {
399403
TKqpProviderContext Pctx;
400404

401405
TAutoPtr<IGraphTransformer> Transformer;
406+
407+
TActorSystem* ActorSystem;
402408
};
403409

404410
} // namespace
405411

406412
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
407413
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
408-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
414+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem)
409415
{
410-
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
416+
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry, actorSystem);
411417
}
412418

413419
} // namespace NKqp
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#include "kqp_columns_getter_transformer.h"
2+
3+
#include <ydb/library/yql/core/yql_expr_optimize.h>
4+
#include <ydb/core/statistics/service/service.h>
5+
#include <ydb/core/statistics/events.h>
6+
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
7+
#include <ydb/library/yql/core/yql_statistics.h>
8+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
9+
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
10+
11+
namespace NKikimr::NKqp {
12+
13+
using namespace NThreading;
14+
using namespace NYql;
15+
16+
void TKqpColumnsGetterTransformer::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) {
17+
if (input->ChildrenSize() < 2) {
18+
return;
19+
}
20+
21+
auto callableInput = input->ChildRef(0);
22+
23+
24+
for (size_t i = 1; i < input->ChildrenSize(); ++i) {
25+
auto maybeLambda = TExprBase(input->ChildRef(i));
26+
if (!maybeLambda.Maybe<TCoLambda>()) {
27+
continue;
28+
}
29+
30+
auto lambda = maybeLambda.Cast<TCoLambda>();
31+
if (!lambda.Args().Size()){
32+
continue;
33+
}
34+
35+
if (callableInput->IsList()){
36+
for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){
37+
TableByExprNode[lambda.Args().Arg(j).Ptr()] = TableByExprNode[callableInput->Child(j)];
38+
}
39+
} else {
40+
TableByExprNode[lambda.Args().Arg(0).Ptr()] = TableByExprNode[callableInput.Get()];
41+
}
42+
}
43+
}
44+
45+
IGraphTransformer::TStatus TKqpColumnsGetterTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
46+
Y_UNUSED(ctx);
47+
48+
output = input;
49+
auto optLvl = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel);
50+
auto enableColumnStats = Config->FeatureFlags.GetEnableColumnStatistics();
51+
if (!(optLvl > 0 && enableColumnStats)) {
52+
return IGraphTransformer::TStatus::Ok;
53+
}
54+
55+
VisitExprLambdasLast(
56+
input,
57+
[&](const TExprNode::TPtr& input) {
58+
BeforeLambdas(input) || BeforeLambdasUnmatched(input);
59+
60+
if (input->IsCallable()) {
61+
PropagateTableToLambdaArgument(input);
62+
}
63+
64+
return true;
65+
},
66+
[&](const TExprNode::TPtr& input) {
67+
return AfterLambdas(input) || AfterLambdasUnmatched(input);
68+
}
69+
);
70+
71+
struct TTableMeta {
72+
TString TableName;
73+
THashMap<ui32, TString> ColumnNameByTag;
74+
};
75+
THashMap<TPathId, TTableMeta> tableMetaByPathId;
76+
77+
// TODO: Add other statistics, not only COUNT_MIN_SKETCH.
78+
auto getStatisticsRequest = MakeHolder<NStat::TEvStatistics::TEvGetStatistics>();
79+
getStatisticsRequest->StatType = NKikimr::NStat::EStatType::COUNT_MIN_SKETCH;
80+
81+
for (const auto& [table, columns]: ColumnsByTableName) {
82+
auto tableMeta = Tables.GetTable(Cluster, table).Metadata;
83+
auto& columnsMeta = tableMeta->Columns;
84+
85+
auto pathId = TPathId(tableMeta->PathId.OwnerId(), tableMeta->PathId.TableId());
86+
for (const auto& column: columns) {
87+
Y_ENSURE(columns.contains(column), "There is no " + column + " in column meta!");
88+
89+
NKikimr::NStat::TRequest req;
90+
req.ColumnTag = columnsMeta[column].Id;
91+
req.PathId = pathId;
92+
getStatisticsRequest->StatRequests.push_back(req);
93+
94+
tableMetaByPathId[pathId].TableName = table;
95+
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
96+
}
97+
}
98+
99+
using TRequest = NStat::TEvStatistics::TEvGetStatistics;
100+
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
101+
struct TResult : public NYql::IKikimrGateway::TGenericResult {
102+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
103+
};
104+
105+
auto promise = NewPromise<TResult>();
106+
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
107+
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
108+
bool isOk = response.Success;
109+
Y_ENSURE(isOk);
110+
111+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
112+
113+
for (auto&& stat: response.StatResponses) {
114+
auto meta = tableMetaByPathId[stat.Req.PathId];
115+
auto columnName = meta.ColumnNameByTag[stat.Req.ColumnTag.value()];
116+
auto& columnStatistics = columnStatisticsByTableName[meta.TableName].Data[columnName];
117+
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
118+
}
119+
120+
promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
121+
};
122+
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
123+
IActor* requestHandler =
124+
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
125+
auto actorId = ActorSystem
126+
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
127+
Y_UNUSED(actorId);
128+
129+
auto columnStatisticsByTableName = promise.GetFuture().GetValueSync();
130+
131+
return IGraphTransformer::TStatus::Ok;
132+
}
133+
134+
bool TKqpColumnsGetterTransformer::BeforeLambdas(const TExprNode::TPtr& input) {
135+
bool matched = true;
136+
137+
if (TKqpTable::Match(input.Get())) {
138+
TableByExprNode[input.Get()] = input.Get();
139+
} else if (auto maybeStreamLookup = TExprBase(input).Maybe<TKqpCnStreamLookup>()) {
140+
TableByExprNode[input.Get()] = maybeStreamLookup.Cast().Table().Ptr();
141+
} else {
142+
matched = false;
143+
}
144+
145+
return matched;
146+
}
147+
148+
bool TKqpColumnsGetterTransformer::BeforeLambdasUnmatched(const TExprNode::TPtr& input) {
149+
for (const auto& node: input->Children()) {
150+
if (TableByExprNode.contains(node)) {
151+
TableByExprNode[input.Get()] = TableByExprNode[node];
152+
return true;
153+
}
154+
}
155+
156+
return true;
157+
}
158+
159+
bool TKqpColumnsGetterTransformer::AfterLambdas(const TExprNode::TPtr& input) {
160+
bool matched = true;
161+
162+
if (
163+
TCoFilterBase::Match(input.Get()) ||
164+
TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref())
165+
) {
166+
auto computer = NDq::TPredicateSelectivityComputer(nullptr, true);
167+
168+
if (TCoFilterBase::Match(input.Get())) {
169+
computer.Compute(TExprBase(input).Cast<TCoFilterBase>().Lambda().Body());
170+
} else if (TCoFlatMapBase::Match(input.Get())) {
171+
computer.Compute(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body());
172+
} else {
173+
Y_ENSURE(false);
174+
}
175+
176+
auto columnStatsUsedMembers = computer.GetColumnStatsUsedMembers();
177+
for (const auto& item: columnStatsUsedMembers.Data) {
178+
auto exprNode = TExprBase(item.Member).Ptr();
179+
if (!TableByExprNode.contains(exprNode) || TableByExprNode[exprNode] == nullptr) {
180+
continue;
181+
}
182+
183+
auto table = TExprBase(TableByExprNode[exprNode]).Cast<TKqpTable>().Path().StringValue();
184+
auto column = item.Member.Name().StringValue();
185+
size_t pointPos = column.find('.'); // table.column
186+
if (pointPos != TString::npos) {
187+
column = column.substr(pointPos + 1);
188+
}
189+
190+
ColumnsByTableName[table].insert(std::move(column));
191+
}
192+
} else {
193+
matched = false;
194+
}
195+
196+
return matched;
197+
}
198+
199+
bool TKqpColumnsGetterTransformer::AfterLambdasUnmatched(const TExprNode::TPtr& input) {
200+
if (TableByExprNode.contains(input.Get())) {
201+
return true;
202+
}
203+
204+
for (const auto& node: input->Children()) {
205+
if (TableByExprNode.contains(node)) {
206+
TableByExprNode[input.Get()] = TableByExprNode[node];
207+
return true;
208+
}
209+
}
210+
211+
return true;
212+
}
213+
214+
TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer(
215+
const TKikimrConfiguration::TPtr& config,
216+
TKikimrTablesData& tables,
217+
TString cluster,
218+
TActorSystem* actorSystem
219+
) {
220+
return THolder<IGraphTransformer>(new TKqpColumnsGetterTransformer(config, tables, cluster, actorSystem));
221+
}
222+
223+
} // end of NKikimr::NKqp
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
3+
#include <ydb/core/kqp/common/kqp_yql.h>
4+
#include <ydb/library/yql/core/yql_graph_transformer.h>
5+
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
6+
#include <ydb/library/yql/core/yql_opt_utils.h>
7+
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
8+
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
9+
10+
namespace NKikimr::NKqp {
11+
12+
using namespace NYql;
13+
using namespace NYql::NNodes;
14+
15+
class TKqpColumnsGetterTransformer : public TSyncTransformerBase {
16+
public:
17+
TKqpColumnsGetterTransformer(
18+
const TKikimrConfiguration::TPtr& config,
19+
TKikimrTablesData& tables,
20+
TString cluster,
21+
TActorSystem* actorSystem
22+
)
23+
: Config(config)
24+
, Tables(tables)
25+
, Cluster(cluster)
26+
, ActorSystem(actorSystem)
27+
{}
28+
29+
// Main method of the transformer
30+
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
31+
32+
void Rewind() override {}
33+
34+
~TKqpColumnsGetterTransformer() override = default;
35+
36+
private:
37+
bool BeforeLambdas(const TExprNode::TPtr& input);
38+
39+
bool BeforeLambdasUnmatched(const TExprNode::TPtr& input);
40+
41+
void PropagateTableToLambdaArgument(const TExprNode::TPtr& input);
42+
43+
bool AfterLambdas(const TExprNode::TPtr& input);
44+
45+
bool AfterLambdasUnmatched(const TExprNode::TPtr& input);
46+
47+
private:
48+
THashMap<TExprNode::TPtr, TExprNode::TPtr> TableByExprNode;
49+
THashMap<TString, THashSet<TString>> ColumnsByTableName;
50+
51+
const TKikimrConfiguration::TPtr& Config;
52+
TKikimrTablesData& Tables;
53+
TString Cluster;
54+
TActorSystem* ActorSystem;
55+
};
56+
57+
TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer(
58+
const TKikimrConfiguration::TPtr& config,
59+
TKikimrTablesData& tables,
60+
TString cluster,
61+
TActorSystem* actorSystem
62+
);
63+
64+
} // end of NKikimr::NKqp namespace

ydb/core/kqp/opt/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ SRCS(
1212
kqp_query_blocks_transformer.cpp
1313
kqp_query_plan.cpp
1414
kqp_statistics_transformer.cpp
15+
kqp_columns_getter_transformer.cpp
1516
kqp_constant_folding_transformer.cpp
1617
)
1718

0 commit comments

Comments
 (0)