Skip to content

Commit 7aaad44

Browse files
authored
Merge e037928 into 4bce249
2 parents 4bce249 + e037928 commit 7aaad44

11 files changed

+472
-100
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1761,7 +1761,7 @@ class TKqpHost : public IKqpHost {
17611761

17621762
void Init(EKikimrQueryType queryType) {
17631763
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
1764-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
1764+
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);
17651765

17661766
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
17671767
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();

ydb/core/kqp/host/kqp_host_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class IKqpRunner : public TThrRefBase {
265265

266266
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
267267
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
268-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
268+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem);
269269

270270
TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
271271
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/kqp/opt/kqp_opt.h>
66
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
77
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
8+
#include <ydb/core/kqp/opt/kqp_columns_getter_transformer.h>
89
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
910
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
1011

@@ -137,7 +138,8 @@ class TKqpRunner : public IKqpRunner {
137138
public:
138139
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
139140
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
140-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
141+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry,
142+
TActorSystem* actorSystem)
141143
: Gateway(gateway)
142144
, Cluster(cluster)
143145
, TypesCtx(*typesCtx)
@@ -149,6 +151,7 @@ class TKqpRunner : public IKqpRunner {
149151
sessionCtx->TablesPtr()))
150152
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
151153
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel)))
154+
, ActorSystem(actorSystem)
152155
{
153156
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
154157
}
@@ -297,6 +300,7 @@ class TKqpRunner : public IKqpRunner {
297300
.AddPostTypeAnnotation(/* forSubgraph */ true)
298301
.AddCommonOptimization()
299302
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
303+
.Add(CreateKqpColumnsGetterTransformer(Config, *typesCtx, SessionCtx->Tables(), Cluster, ActorSystem), "ColumnGetter")
300304
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
301305
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
302306
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
@@ -324,7 +328,7 @@ class TKqpRunner : public IKqpRunner {
324328
Config),
325329
"BuildPhysicalTxs")
326330
.Build(false));
327-
331+
328332
auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
329333
.AddServiceTransformers()
330334
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
@@ -399,15 +403,17 @@ class TKqpRunner : public IKqpRunner {
399403
TKqpProviderContext Pctx;
400404

401405
TAutoPtr<IGraphTransformer> Transformer;
406+
407+
TActorSystem* ActorSystem;
402408
};
403409

404410
} // namespace
405411

406412
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
407413
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
408-
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
414+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TActorSystem* actorSystem)
409415
{
410-
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
416+
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry, actorSystem);
411417
}
412418

413419
} // namespace NKqp
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
#include "kqp_columns_getter_transformer.h"
2+
3+
#include <ydb/library/yql/core/yql_expr_optimize.h>
4+
#include <ydb/core/statistics/service/service.h>
5+
#include <ydb/core/statistics/events.h>
6+
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
7+
#include <ydb/library/yql/core/yql_statistics.h>
8+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
9+
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
10+
#include <ydb/library/yql/utils/log/log.h>
11+
12+
namespace NKikimr::NKqp {
13+
14+
using namespace NThreading;
15+
using namespace NYql;
16+
17+
void TKqpColumnsGetterTransformer::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) {
18+
if (input->ChildrenSize() < 2) {
19+
return;
20+
}
21+
22+
auto callableInput = input->ChildRef(0);
23+
24+
25+
for (size_t i = 1; i < input->ChildrenSize(); ++i) {
26+
auto maybeLambda = TExprBase(input->ChildRef(i));
27+
if (!maybeLambda.Maybe<TCoLambda>()) {
28+
continue;
29+
}
30+
31+
auto lambda = maybeLambda.Cast<TCoLambda>();
32+
if (!lambda.Args().Size()){
33+
continue;
34+
}
35+
36+
if (callableInput->IsList()){
37+
for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){
38+
TableByExprNode[lambda.Args().Arg(j).Ptr()] = TableByExprNode[callableInput->Child(j)];
39+
}
40+
} else {
41+
TableByExprNode[lambda.Args().Arg(0).Ptr()] = TableByExprNode[callableInput.Get()];
42+
}
43+
}
44+
}
45+
46+
IGraphTransformer::TStatus TKqpColumnsGetterTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
47+
Y_UNUSED(ctx);
48+
49+
output = input;
50+
auto optLvl = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel);
51+
auto enableColumnStats = Config->FeatureFlags.GetEnableColumnStatistics();
52+
if (!(optLvl > 0 && enableColumnStats)) {
53+
return IGraphTransformer::TStatus::Ok;
54+
}
55+
56+
VisitExprLambdasLast(
57+
input,
58+
[&](const TExprNode::TPtr& input) {
59+
BeforeLambdas(input) || BeforeLambdasUnmatched(input);
60+
61+
if (input->IsCallable()) {
62+
PropagateTableToLambdaArgument(input);
63+
}
64+
65+
return true;
66+
},
67+
[&](const TExprNode::TPtr& input) {
68+
return AfterLambdas(input) || AfterLambdasUnmatched(input);
69+
}
70+
);
71+
72+
if (ColumnsByTableName.empty()) {
73+
return IGraphTransformer::TStatus::Ok;
74+
}
75+
76+
struct TTableMeta {
77+
TString TableName;
78+
THashMap<ui32, TString> ColumnNameByTag;
79+
};
80+
THashMap<TPathId, TTableMeta> tableMetaByPathId;
81+
82+
// TODO: Add other statistics, not only COUNT_MIN_SKETCH.
83+
auto getStatisticsRequest = MakeHolder<NStat::TEvStatistics::TEvGetStatistics>();
84+
getStatisticsRequest->StatType = NKikimr::NStat::EStatType::COUNT_MIN_SKETCH;
85+
86+
for (const auto& [table, columns]: ColumnsByTableName) {
87+
auto tableMeta = Tables.GetTable(Cluster, table).Metadata;
88+
auto& columnsMeta = tableMeta->Columns;
89+
90+
auto pathId = TPathId(tableMeta->PathId.OwnerId(), tableMeta->PathId.TableId());
91+
for (const auto& column: columns) {
92+
if (TypesCtx.ColumnStatisticsByTableName.contains(table) && TypesCtx.ColumnStatisticsByTableName[table]->Data.contains(column)) {
93+
continue;
94+
}
95+
96+
if (!columns.contains(column)) {
97+
YQL_CLOG(DEBUG, ProviderKikimr) << "Table: " + table + " doesn't contain " + column + " to request for column statistics";
98+
}
99+
100+
NKikimr::NStat::TRequest req;
101+
req.ColumnTag = columnsMeta[column].Id;
102+
req.PathId = pathId;
103+
getStatisticsRequest->StatRequests.push_back(req);
104+
105+
tableMetaByPathId[pathId].TableName = table;
106+
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
107+
}
108+
}
109+
110+
if (getStatisticsRequest->StatRequests.empty()) {
111+
return IGraphTransformer::TStatus::Ok;
112+
}
113+
114+
using TRequest = NStat::TEvStatistics::TEvGetStatistics;
115+
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
116+
struct TResult : public NYql::IKikimrGateway::TGenericResult {
117+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
118+
};
119+
120+
auto promise = NewPromise<TResult>();
121+
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
122+
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
123+
if (!response.Success) {
124+
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
125+
}
126+
127+
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
128+
129+
for (auto&& stat: response.StatResponses) {
130+
auto meta = tableMetaByPathId[stat.Req.PathId];
131+
auto columnName = meta.ColumnNameByTag[stat.Req.ColumnTag.value()];
132+
auto& columnStatistics = columnStatisticsByTableName[meta.TableName].Data[columnName];
133+
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
134+
}
135+
136+
promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
137+
};
138+
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
139+
IActor* requestHandler =
140+
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
141+
auto actorId = ActorSystem
142+
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
143+
Y_UNUSED(actorId);
144+
145+
auto res = promise.GetFuture().GetValueSync();
146+
if (!res.Issues().Empty()) {
147+
TStringStream ss;
148+
res.Issues().PrintTo(ss);
149+
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
150+
return IGraphTransformer::TStatus::Ok;
151+
}
152+
153+
for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
154+
TypesCtx.ColumnStatisticsByTableName.insert(
155+
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
156+
);
157+
}
158+
159+
return IGraphTransformer::TStatus::Ok;
160+
}
161+
162+
bool TKqpColumnsGetterTransformer::BeforeLambdas(const TExprNode::TPtr& input) {
163+
bool matched = true;
164+
165+
if (TKqpTable::Match(input.Get())) {
166+
TableByExprNode[input.Get()] = input.Get();
167+
} else if (auto maybeStreamLookup = TExprBase(input).Maybe<TKqpCnStreamLookup>()) {
168+
TableByExprNode[input.Get()] = maybeStreamLookup.Cast().Table().Ptr();
169+
} else {
170+
matched = false;
171+
}
172+
173+
return matched;
174+
}
175+
176+
bool TKqpColumnsGetterTransformer::BeforeLambdasUnmatched(const TExprNode::TPtr& input) {
177+
for (const auto& node: input->Children()) {
178+
if (TableByExprNode.contains(node)) {
179+
TableByExprNode[input.Get()] = TableByExprNode[node];
180+
return true;
181+
}
182+
}
183+
184+
return true;
185+
}
186+
187+
bool TKqpColumnsGetterTransformer::AfterLambdas(const TExprNode::TPtr& input) {
188+
bool matched = true;
189+
190+
if (
191+
TCoFilterBase::Match(input.Get()) ||
192+
TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref())
193+
) {
194+
auto computer = NDq::TPredicateSelectivityComputer(nullptr, true);
195+
196+
if (TCoFilterBase::Match(input.Get())) {
197+
computer.Compute(TExprBase(input).Cast<TCoFilterBase>().Lambda().Body());
198+
} else if (TCoFlatMapBase::Match(input.Get())) {
199+
computer.Compute(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body());
200+
} else {
201+
Y_ENSURE(false);
202+
}
203+
204+
auto columnStatsUsedMembers = computer.GetColumnStatsUsedMembers();
205+
for (const auto& item: columnStatsUsedMembers.Data) {
206+
auto exprNode = TExprBase(item.Member).Ptr();
207+
if (!TableByExprNode.contains(exprNode) || TableByExprNode[exprNode] == nullptr) {
208+
continue;
209+
}
210+
211+
auto table = TExprBase(TableByExprNode[exprNode]).Cast<TKqpTable>().Path().StringValue();
212+
auto column = item.Member.Name().StringValue();
213+
size_t pointPos = column.find('.'); // table.column
214+
if (pointPos != TString::npos) {
215+
column = column.substr(pointPos + 1);
216+
}
217+
218+
ColumnsByTableName[table].insert(std::move(column));
219+
}
220+
} else {
221+
matched = false;
222+
}
223+
224+
return matched;
225+
}
226+
227+
bool TKqpColumnsGetterTransformer::AfterLambdasUnmatched(const TExprNode::TPtr& input) {
228+
if (TableByExprNode.contains(input.Get())) {
229+
return true;
230+
}
231+
232+
for (const auto& node: input->Children()) {
233+
if (TableByExprNode.contains(node)) {
234+
TableByExprNode[input.Get()] = TableByExprNode[node];
235+
return true;
236+
}
237+
}
238+
239+
return true;
240+
}
241+
242+
TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer(
243+
const TKikimrConfiguration::TPtr& config,
244+
TTypeAnnotationContext& typesCtx,
245+
TKikimrTablesData& tables,
246+
TString cluster,
247+
TActorSystem* actorSystem
248+
) {
249+
return THolder<IGraphTransformer>(new TKqpColumnsGetterTransformer(config, typesCtx, tables, cluster, actorSystem));
250+
}
251+
252+
} // end of NKikimr::NKqp

0 commit comments

Comments
 (0)