Skip to content

Commit b567e12

Browse files
authored
Implemented two pass window functions (#5247)
1 parent 42b4fda commit b567e12

File tree

43 files changed

+968
-63
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+968
-63
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
193193
}
194194

195195
TMaybeNode<TExprBase> ExpandWindowFunctions(TExprBase node, TExprContext& ctx) {
196-
TExprBase output = DqExpandWindowFunctions(node, ctx, true);
196+
TExprBase output = DqExpandWindowFunctions(node, ctx, TypesCtx, true);
197197
DumpAppliedRule("ExpandWindowFunctions", node.Ptr(), output.Ptr(), ctx);
198198
return output;
199199
}

ydb/library/yql/core/common_opt/yql_co_flow2.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,22 +1758,32 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
17581758
};
17591759

17601760
map["SessionWindowTraits"] = map["SortTraits"] = map["Lag"] = map["Lead"] = map["RowNumber"] = map["Rank"] = map["DenseRank"] =
1761+
map["CumeDist"] = map["PercentRank"] = map["NTile"] =
17611762
[](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx)
17621763
{
17631764
auto structType = node->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()
17641765
->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
1765-
if (node->IsCallable("RowNumber")) {
1766+
if (node->IsCallable({"RowNumber", "CumeDist", "NTile"})) {
17661767
if (structType->GetSize() == 0) {
17671768
return node;
17681769
}
17691770

17701771
auto subsetType = ctx.MakeType<TListExprType>(ctx.MakeType<TStructExprType>(TVector<const TItemExprType*>()));
17711772
YQL_CLOG(DEBUG, Core) << "FieldSubset for " << node->Content();
1772-
return ctx.Builder(node->Pos())
1773-
.Callable(node->Content())
1774-
.Add(0, ExpandType(node->Pos(), *subsetType, ctx))
1775-
.Seal()
1776-
.Build();
1773+
if (node->IsCallable("NTile")) {
1774+
return ctx.Builder(node->Pos())
1775+
.Callable(node->Content())
1776+
.Add(0, ExpandType(node->Pos(), *subsetType, ctx))
1777+
.Add(1, node->TailPtr())
1778+
.Seal()
1779+
.Build();
1780+
} else {
1781+
return ctx.Builder(node->Pos())
1782+
.Callable(node->Content())
1783+
.Add(0, ExpandType(node->Pos(), *subsetType, ctx))
1784+
.Seal()
1785+
.Build();
1786+
}
17771787
}
17781788

17791789
TSet<ui32> lambdaIndexes;

ydb/library/yql/core/common_opt/yql_co_pgselect.cpp

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2691,17 +2691,30 @@ TExprNode::TPtr BuildWindows(TPositionHandle pos, const TExprNode::TPtr& list, c
26912691
if (isAgg) {
26922692
value = BuildAggregationTraits(pos, true, "", p, listTypeNode, &aggId, ctx, optCtx);
26932693
} else {
2694-
if (name == "row_number") {
2694+
if (name == "row_number" || name == "cume_dist") {
26952695
value = ctx.Builder(pos)
2696-
.Callable("RowNumber")
2696+
.Callable(name == "row_number" ? "RowNumber" : "CumeDist")
26972697
.Callable(0, "TypeOf")
2698-
.Add(0, list)
2698+
.Add(0, list)
2699+
.Seal()
2700+
.Seal()
2701+
.Build();
2702+
} else if (name == "ntile") {
2703+
value = ctx.Builder(pos)
2704+
.Callable("NTile")
2705+
.Callable(0, "TypeOf")
2706+
.Add(0, list)
2707+
.Seal()
2708+
.Callable(1, "Unwrap")
2709+
.Callable(0, "FromPg")
2710+
.Add(0, p.first->ChildPtr(3))
2711+
.Seal()
26992712
.Seal()
27002713
.Seal()
27012714
.Build();
2702-
} else if (name == "rank" || name == "dense_rank") {
2715+
} else if (name == "rank" || name == "dense_rank" || name == "percent_rank") {
27032716
value = ctx.Builder(pos)
2704-
.Callable((name == "rank") ? "Rank" : "DenseRank")
2717+
.Callable((name == "rank") ? "Rank" : (name == "dense_rank" ? "DenseRank" : "PercentRank"))
27052718
.Callable(0, "TypeOf")
27062719
.Add(0, list)
27072720
.Seal()
@@ -2804,6 +2817,32 @@ TExprNode::TPtr BuildWindows(TPositionHandle pos, const TExprNode::TPtr& list, c
28042817
.Seal()
28052818
.Seal()
28062819
.Build();
2820+
} else if (node->Head().Content() == "ntile") {
2821+
ret = ctx.Builder(node->Pos())
2822+
.Callable("ToPg")
2823+
.Callable(0, "SafeCast")
2824+
.Add(0, ret)
2825+
.Atom(1, "Int32")
2826+
.Seal()
2827+
.Seal()
2828+
.Build();
2829+
} else if (node->Head().Content() == "cume_dist" || node->Head().Content() == "percent_rank") {
2830+
if (node->Head().Content() == "percent_rank") {
2831+
ret = ctx.Builder(node->Pos())
2832+
.Callable("Nanvl")
2833+
.Add(0, ret)
2834+
.Callable(1, "Double")
2835+
.Atom(0, "0.0")
2836+
.Seal()
2837+
.Seal()
2838+
.Build();
2839+
}
2840+
2841+
ret = ctx.Builder(node->Pos())
2842+
.Callable("ToPg")
2843+
.Add(0, ret)
2844+
.Seal()
2845+
.Build();
28072846
}
28082847

28092848
return ret;

ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8217,9 +8217,6 @@ struct TPeepHoleRules {
82178217
{"FoldMap", &CleckClosureOnUpperLambdaOverList<2U>},
82188218
{"Fold1Map", &CleckClosureOnUpperLambdaOverList<1U, 2U>},
82198219
{"Chain1Map", &CleckClosureOnUpperLambdaOverList<1U, 2U>},
8220-
{"CalcOverWindow", &ExpandCalcOverWindow},
8221-
{"CalcOverSessionWindow", &ExpandCalcOverWindow},
8222-
{"CalcOverWindowGroup", &ExpandCalcOverWindow},
82238220
{"PartitionsByKeys", &ExpandPartitionsByKeys},
82248221
{"DictItems", &MapForOptionalContainer},
82258222
{"DictKeys", &MapForOptionalContainer},
@@ -8283,7 +8280,10 @@ struct TPeepHoleRules {
82838280
{"AggregateFinalize", &ExpandAggregatePeephole},
82848281
{"CostsOf", &ExpandCostsOf},
82858282
{"JsonQuery", &ExpandJsonQuery},
8286-
{"MatchRecognize", &ExpandMatchRecognize}
8283+
{"MatchRecognize", &ExpandMatchRecognize},
8284+
{"CalcOverWindow", &ExpandCalcOverWindow},
8285+
{"CalcOverSessionWindow", &ExpandCalcOverWindow},
8286+
{"CalcOverWindowGroup", &ExpandCalcOverWindow},
82878287
};
82888288

82898289
const TPeepHoleOptimizerMap SimplifyStageRules = {

ydb/library/yql/core/services/yql_lineage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,9 +723,9 @@ class TLineageScanner {
723723
const auto& list = f->Child(i);
724724
auto field = list->Head().Content();
725725
auto& res = (*lineage.Fields)[field];
726-
if (list->Tail().IsCallable("RowNumber")) {
726+
if (list->Tail().IsCallable({"RowNumber","CumeDist","NTile"})) {
727727
continue;
728-
} else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank"})) {
728+
} else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank","PercentRank"})) {
729729
const auto& lambda = list->Tail().Child(1);
730730
bool produceStruct = list->Tail().IsCallable({"Lag","Lead"});
731731
MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct);

ydb/library/yql/core/type_ann/type_ann_core.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12315,6 +12315,9 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
1231512315
Functions["RowNumber"] = &WinRowNumberWrapper;
1231612316
Functions["Rank"] = &WinRankWrapper;
1231712317
Functions["DenseRank"] = &WinRankWrapper;
12318+
Functions["PercentRank"] = &WinRankWrapper;
12319+
Functions["CumeDist"] = &WinRowNumberWrapper;
12320+
Functions["NTile"] = &WinNTileWrapper;
1231812321
Functions["Ascending"] = &PresortWrapper;
1231912322
Functions["Descending"] = &PresortWrapper;
1232012323
Functions["IsKeySwitch"] = &IsKeySwitchWrapper;

ydb/library/yql/core/type_ann/type_ann_list.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ namespace {
540540

541541
const auto paramName = func->Child(0)->Content();
542542
const auto calcSpec = func->Child(1);
543-
YQL_ENSURE(calcSpec->IsCallable({"Lag", "Lead", "RowNumber", "Rank", "DenseRank", "WindowTraits"}));
543+
YQL_ENSURE(calcSpec->IsCallable({"Lag", "Lead", "RowNumber", "Rank", "DenseRank", "WindowTraits", "PercentRank", "CumeDist", "NTile"}));
544544

545545
auto traitsInputTypeNode = calcSpec->Child(0);
546546
YQL_ENSURE(traitsInputTypeNode->GetTypeAnn());
@@ -5948,7 +5948,7 @@ namespace {
59485948
}
59495949
auto currColumn = input->Child(i)->Child(0)->Content();
59505950
auto calcSpec = input->Child(i)->Child(1);
5951-
if (!calcSpec->IsCallable({"WindowTraits", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "Void"})) {
5951+
if (!calcSpec->IsCallable({"WindowTraits", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "PercentRank", "CumeDist", "NTile", "Void"})) {
59525952
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(calcSpec->Pos()),
59535953
"Invalid traits or special function for calculation on window"));
59545954
return IGraphTransformer::TStatus::Error;
@@ -6305,6 +6305,26 @@ namespace {
63056305
if (auto status = EnsureTypeRewrite(input->HeadRef(), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
63066306
return status;
63076307
}
6308+
input->SetTypeAnn(ctx.Expr.MakeType<TDataExprType>(input->IsCallable("CumeDist") ? EDataSlot::Double : EDataSlot::Uint64));
6309+
return IGraphTransformer::TStatus::Ok;
6310+
}
6311+
6312+
IGraphTransformer::TStatus WinNTileWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
6313+
Y_UNUSED(output);
6314+
if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
6315+
return IGraphTransformer::TStatus::Error;
6316+
}
6317+
6318+
if (auto status = EnsureTypeRewrite(input->HeadRef(), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
6319+
return status;
6320+
}
6321+
6322+
auto expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
6323+
auto status = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr);
6324+
if (status.Level != IGraphTransformer::TStatus::Ok) {
6325+
return status;
6326+
}
6327+
63086328
input->SetTypeAnn(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64));
63096329
return IGraphTransformer::TStatus::Ok;
63106330
}
@@ -6403,7 +6423,8 @@ namespace {
64036423
return IGraphTransformer::TStatus::Repeat;
64046424
}
64056425

6406-
const TTypeAnnotationNode* outputType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64);
6426+
const TTypeAnnotationNode* outputType = ctx.Expr.MakeType<TDataExprType>(input->IsCallable("PercentRank") ?
6427+
EDataSlot::Double : EDataSlot::Uint64);
64076428
if (!isAnsi && keyType->GetKind() == ETypeAnnotationKind::Optional) {
64086429
outputType = ctx.Expr.MakeType<TOptionalExprType>(outputType);
64096430
}

ydb/library/yql/core/type_ann/type_ann_list.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ namespace NTypeAnnImpl {
115115
IGraphTransformer::TStatus WinLeadLagWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
116116
IGraphTransformer::TStatus WinRowNumberWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
117117
IGraphTransformer::TStatus WinRankWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
118+
IGraphTransformer::TStatus WinNTileWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
118119
IGraphTransformer::TStatus HoppingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
119120
IGraphTransformer::TStatus MultiHoppingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
120121
IGraphTransformer::TStatus HoppingTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);

ydb/library/yql/core/type_ann/type_ann_pg.cpp

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -774,14 +774,6 @@ IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TEx
774774
return IGraphTransformer::TStatus::Error;
775775
}
776776

777-
auto name = input->Child(4)->GetTypeAnn()->Cast<TPgExprType>()->GetName();
778-
if (name != "int4") {
779-
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(4)->Pos()), TStringBuilder() <<
780-
"Expected pgint4 type, but got: " << name));
781-
return IGraphTransformer::TStatus::Error;
782-
}
783-
784-
785777
auto arg = input->Child(3)->GetTypeAnn();
786778
if (arg->IsOptionalOrNull()) {
787779
input->SetTypeAnn(arg);
@@ -796,6 +788,35 @@ IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TEx
796788
}
797789

798790
input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(NPg::LookupType("int8").TypeId));
791+
} else if (name == "cume_dist" || name == "percent_rank") {
792+
if (input->ChildrenSize() != 3) {
793+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
794+
TStringBuilder() << "Expected no arguments in function " << name));
795+
return IGraphTransformer::TStatus::Error;
796+
}
797+
798+
input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(NPg::LookupType("float8").TypeId));
799+
} else if (name == "ntile") {
800+
if (input->ChildrenSize() != 4) {
801+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
802+
TStringBuilder() << "Expected exactly one argument in function " << name));
803+
return IGraphTransformer::TStatus::Error;
804+
}
805+
806+
if (input->Child(3)->GetTypeAnn() && input->Child(3)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
807+
auto name = input->Child(3)->GetTypeAnn()->Cast<TPgExprType>()->GetName();
808+
if (name != "int4") {
809+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(3)->Pos()), TStringBuilder() <<
810+
"Expected int4 type, but got: " << name));
811+
return IGraphTransformer::TStatus::Error;
812+
}
813+
} else {
814+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(3)->Pos()), TStringBuilder() <<
815+
"Expected pg type, but got: " << input->Child(3)->GetTypeAnn()->GetKind()));
816+
return IGraphTransformer::TStatus::Error;
817+
}
818+
819+
input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(NPg::LookupType("int4").TypeId));
799820
} else {
800821
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
801822
TStringBuilder() << "Unsupported function: " << name));

0 commit comments

Comments
 (0)