Skip to content

Commit ac530bd

Browse files
authored
AssumeUnique/AssumeDistinct optimizers (#10102)
1 parent 72f3cf1 commit ac530bd

File tree

17 files changed

+239
-181
lines changed

17 files changed

+239
-181
lines changed

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -393,31 +393,30 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, IOptimiz
393393
.Done();
394394
}
395395

396-
template <typename TMembersFilter>
397-
TExprBase DqPushMembersFilterToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
396+
template <typename TInputBaseCallable>
397+
TExprBase DqPushInputBaseCallableToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
398398
const TParentsMap& parentsMap, bool allowStageMultiUsage)
399399
{
400-
if (!node.Maybe<TMembersFilter>().Input().template Maybe<TDqCnUnionAll>()) {
400+
if (!node.Maybe<TInputBaseCallable>() || !TDqCnUnionAll::Match(&node.Ref().Head())) {
401401
return node;
402402
}
403403

404-
auto filter = node.Cast<TMembersFilter>();
405-
auto dqUnion = filter.Input().template Cast<TDqCnUnionAll>();
404+
auto dqUnion = TDqCnUnionAll(node.Ref().HeadPtr());
406405

407406
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
408407
return node;
409408
}
410409

411410
if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
412-
return TExprBase(ctx.ChangeChild(*node.Raw(), TMembersFilter::idx_Input, std::move(connToPushableStage)));
411+
return TExprBase(ctx.ChangeChild(node.Ref(), 0, std::move(connToPushableStage)));
413412
}
414413

415-
auto lambda = Build<TCoLambda>(ctx, filter.Pos())
414+
auto lambda = Build<TCoLambda>(ctx, node.Pos())
416415
.Args({"stream"})
417-
.template Body<TMembersFilter>()
418-
.Input("stream")
419-
.Members(filter.Members())
420-
.Build()
416+
.Body<TExprApplier>()
417+
.Apply(node)
418+
.With(TExprBase(node.Ref().HeadPtr()), "stream")
419+
.Build()
421420
.Done();
422421

423422
auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
@@ -696,13 +695,25 @@ void DqPushLambdasToStagesUnionAll(std::vector<std::pair<TDqCnUnionAll, TCoLambd
696695
TExprBase DqPushSkipNullMembersToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
697696
const TParentsMap& parentsMap, bool allowStageMultiUsage)
698697
{
699-
return DqPushMembersFilterToStage<TCoSkipNullMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
698+
return DqPushInputBaseCallableToStage<TCoSkipNullMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
700699
}
701700

702701
TExprBase DqPushExtractMembersToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
703702
const TParentsMap& parentsMap, bool allowStageMultiUsage)
704703
{
705-
return DqPushMembersFilterToStage<TCoExtractMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
704+
return DqPushInputBaseCallableToStage<TCoExtractMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
705+
}
706+
707+
TExprBase DqPushAssumeDistinctToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
708+
const TParentsMap& parentsMap, bool allowStageMultiUsage)
709+
{
710+
return DqPushInputBaseCallableToStage<TCoAssumeDistinct>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
711+
}
712+
713+
TExprBase DqPushAssumeUniqueToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
714+
const TParentsMap& parentsMap, bool allowStageMultiUsage)
715+
{
716+
return DqPushInputBaseCallableToStage<TCoAssumeUnique>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
706717
}
707718

708719
TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {

ydb/library/yql/dq/opt/dq_opt_phy.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ NNodes::TExprBase DqPushSkipNullMembersToStage(NNodes::TExprBase node, TExprCont
3131
NNodes::TExprBase DqPushExtractMembersToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
3232
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
3333

34+
NNodes::TExprBase DqPushAssumeDistinctToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
35+
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
36+
37+
NNodes::TExprBase DqPushAssumeUniqueToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
38+
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
39+
3440
NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
3541
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
3642

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
3030
AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap));
3131
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
3232
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
33+
AddHandler(0, &TCoAssumeUnique::Match, HNDL(PushAssumeUniqueToStage<false>));
34+
AddHandler(0, &TCoAssumeDistinct::Match, HNDL(PushAssumeDistinctToStage<false>));
3335
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
3436
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
3537
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
@@ -68,6 +70,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
6870

6971
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
7072
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
73+
AddHandler(1, &TCoAssumeUnique::Match, HNDL(PushAssumeUniqueToStage<true>));
74+
AddHandler(1, &TCoAssumeDistinct::Match, HNDL(PushAssumeDistinctToStage<true>));
7175
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
7276
AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>));
7377
AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>));
@@ -118,6 +122,16 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
118122
return DqPushExtractMembersToStage(node, ctx, optCtx, *getParents(), IsGlobal);
119123
}
120124

125+
template <bool IsGlobal>
126+
TMaybeNode<TExprBase> PushAssumeDistinctToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
127+
return DqPushAssumeDistinctToStage(node, ctx, optCtx, *getParents(), IsGlobal);
128+
}
129+
130+
template <bool IsGlobal>
131+
TMaybeNode<TExprBase> PushAssumeUniqueToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
132+
return DqPushAssumeUniqueToStage(node, ctx, optCtx, *getParents(), IsGlobal);
133+
}
134+
121135
template <bool IsGlobal>
122136
TMaybeNode<TExprBase> BuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
123137
return DqBuildFlatmapStage(node, ctx, optCtx, *getParents(), IsGlobal);

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
5252
AddHandler(0, &TCoExtendBase::Match, HNDL(Extend));
5353
AddHandler(0, &TCoAssumeSorted::Match, HNDL(AssumeConstraints));
5454
AddHandler(0, &TCoAssumeConstraints::Match, HNDL(AssumeConstraints));
55+
AddHandler(0, &TCoAssumeUnique::Match, HNDL(AssumeConstraints));
56+
AddHandler(0, &TCoAssumeDistinct::Match, HNDL(AssumeConstraints));
5557
AddHandler(0, &TYtMapReduce::Match, HNDL(AddTrivialMapperForNativeYtTypes));
5658
AddHandler(0, &TYtDqWrite::Match, HNDL(YtDqWrite));
5759
AddHandler(0, &TYtDqProcessWrite::Match, HNDL(YtDqProcessWrite));

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_sort.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -462,18 +462,17 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::TopSort(TExprBase node,
462462
}
463463

464464

465-
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeConstraints(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
465+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeConstraints(TExprBase assume, TExprContext& ctx, const TGetParents& getParents) const {
466466
if (State_->Types->EvaluationInProgress || State_->PassiveExecution) {
467-
return node;
467+
return assume;
468468
}
469469

470-
auto assume = node.Cast<TCoInputBase>();
471-
auto input = assume.Input();
470+
auto input = TExprBase(assume.Ref().HeadPtr());
472471
if (!IsYtProviderInput(input)) {
473-
return node;
472+
return assume;
474473
}
475474

476-
auto sorted = node.Ref().GetConstraint<TSortedConstraintNode>();
475+
auto sorted = assume.Ref().GetConstraint<TSortedConstraintNode>();
477476

478477
auto maybeOp = input.Maybe<TYtOutput>().Operation();
479478
bool needSeparateOp = !maybeOp
@@ -493,12 +492,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeConstraints(TExpr
493492
}
494493
}
495494
}
496-
if (equalSort && maybeOp.Maybe<TYtSort>() && node.Ref().GetAllConstraints().size() == 1 /* only sort constraint */) {
495+
if (equalSort && maybeOp.Maybe<TYtSort>() && assume.Ref().GetAllConstraints().size() == 1 /* only sort constraint */) {
497496
return input;
498497
}
499498

500499
const TStructExprType* outItemType = nullptr;
501-
if (auto type = GetSequenceItemType(node, false, ctx)) {
500+
if (auto type = GetSequenceItemType(assume, false, ctx)) {
502501
outItemType = type->Cast<TStructExprType>();
503502
} else {
504503
return {};

ydb/library/yql/tests/sql/dq_file/part14/canondata/result.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1334,6 +1334,35 @@
13341334
}
13351335
],
13361336
"test.test[insert-select_relabel-default.txt-Results]": [],
1337+
"test.test[insert-unique_distinct_hints--Analyze]": [
1338+
{
1339+
"checksum": "82346eb438496d882b8c74301f1ae7c2",
1340+
"size": 29836,
1341+
"uri": "https://{canondata_backend}/1931696/59c974a5d18c41e65f27bd82416d6f8307cc1616/resource.tar.gz#test.test_insert-unique_distinct_hints--Analyze_/plan.txt"
1342+
},
1343+
{
1344+
"uri": "file://test.test_insert-unique_distinct_hints--Analyze_/extracted"
1345+
}
1346+
],
1347+
"test.test[insert-unique_distinct_hints--Debug]": [
1348+
{
1349+
"checksum": "518c7939eaedda363bffabb9ece44f10",
1350+
"size": 7473,
1351+
"uri": "https://{canondata_backend}/1931696/59c974a5d18c41e65f27bd82416d6f8307cc1616/resource.tar.gz#test.test_insert-unique_distinct_hints--Debug_/opt.yql_patched"
1352+
}
1353+
],
1354+
"test.test[insert-unique_distinct_hints--Plan]": [
1355+
{
1356+
"checksum": "09f3e1fc421d00f156cb829d2e9284a8",
1357+
"size": 23914,
1358+
"uri": "https://{canondata_backend}/1931696/59c974a5d18c41e65f27bd82416d6f8307cc1616/resource.tar.gz#test.test_insert-unique_distinct_hints--Plan_/plan.txt"
1359+
}
1360+
],
1361+
"test.test[insert-unique_distinct_hints--Results]": [
1362+
{
1363+
"uri": "file://test.test_insert-unique_distinct_hints--Results_/extracted"
1364+
}
1365+
],
13371366
"test.test[join-anyjoin_common_nodata_keys--Analyze]": [
13381367
{
13391368
"checksum": "db24769a8f98b92b567d5aad8e54f031",
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<tmp_path>/program.sql:<main>: Warning: Post type annotation
2+
3+
<tmp_path>/program.sql:<main>:13:26: Warning: At function: Unordered
4+
INSERT INTO Output7 WITH TRUNCATE SELECT /*+ unique(subkey value) */ key, value FROM Input;
5+
^
6+
<tmp_path>/program.sql:<main>:13:35: Warning: At function: AssumeUniqueHint
7+
INSERT INTO Output7 WITH TRUNCATE SELECT /*+ unique(subkey value) */ key, value FROM Input;
8+
^
9+
<tmp_path>/program.sql:<main>:13:35: Warning: Unique sql hint contains invalid column:
10+
Unique((subkey,value)) is not applicable to List<Struct<'key':String,'value':String>>
11+
INSERT INTO Output7 WITH TRUNCATE SELECT /*+ unique(subkey value) */ key, value FROM Input;
12+
^
13+
<tmp_path>/program.sql:<main>: Info: Optimization
14+
15+
<tmp_path>/program.sql:<main>: Info: DQ cannot execute the query. Cause: unsupported write of unique data

ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2834,16 +2834,6 @@
28342834
}
28352835
],
28362836
"test.test[select-select_all_ordered-default.txt-Results]": [],
2837-
"test.test[select-unique_distinct_hints--Analyze]": [
2838-
{
2839-
"checksum": "9fa7fa98806742f3d0981dbfe8793441",
2840-
"size": 16307,
2841-
"uri": "https://{canondata_backend}/1600758/a3a753bb80f9458f9a8a4c3335cab227f87f92b5/resource.tar.gz#test.test_select-unique_distinct_hints--Analyze_/plan.txt"
2842-
},
2843-
{
2844-
"uri": "file://test.test_select-unique_distinct_hints--Analyze_/extracted"
2845-
}
2846-
],
28472837
"test.test[simple_columns-simple_columns_join_coalesce_bug8923-default.txt-Analyze]": [
28482838
{
28492839
"checksum": "b4dd508a329723c74293d80f0278c705",

ydb/library/yql/tests/sql/dq_file/part7/canondata/test.test_select-unique_distinct_hints--Analyze_/extracted

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)