Skip to content

Commit d5247f0

Browse files
authored
Merge 7557e9e into 2a0bfb0
2 parents 2a0bfb0 + 7557e9e commit d5247f0

File tree

11 files changed

+836
-570
lines changed

11 files changed

+836
-570
lines changed

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/yql/core/yql_atom_enums.h>
66
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
77
#include <ydb/library/yql/core/yql_join.h>
8+
#include <ydb/library/yql/core/yql_opt_hopping.h>
89
#include <ydb/library/yql/core/yql_opt_utils.h>
910
#include <ydb/library/yql/core/yql_opt_window.h>
1011
#include <ydb/library/yql/core/yql_type_helpers.h>
@@ -3301,6 +3302,111 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
33013302
return aggr.Ptr();
33023303
}
33033304

3305+
TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
3306+
const auto pos = aggregate.Pos();
3307+
3308+
NHopping::EnsureNotDistinct(aggregate);
3309+
3310+
const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
3311+
if (!maybeHopTraits) {
3312+
return nullptr;
3313+
}
3314+
const auto hopTraits = *maybeHopTraits;
3315+
3316+
const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
3317+
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);
3318+
3319+
// if (keysDescription.NeedPickle()) {
3320+
// return Build<TCoMap>(ctx, pos)
3321+
// .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
3322+
// .Input<TCoAggregate>()
3323+
// .InitFrom(aggregate)
3324+
// .Input<TCoMap>()
3325+
// .Lambda(keysDescription.BuildPickleLambda(ctx, pos))
3326+
// .Input(aggregate.Input())
3327+
// .Build()
3328+
// .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx))
3329+
// .Build()
3330+
// .Done()
3331+
// .Ptr();
3332+
// }
3333+
3334+
const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
3335+
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
3336+
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
3337+
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
3338+
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
3339+
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
3340+
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
3341+
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);
3342+
3343+
const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
3344+
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
3345+
.KeyExtractor(keyLambda)
3346+
.TimeExtractor(timeExtractorLambda)
3347+
.Hop(hopTraits.Traits.Hop())
3348+
.Interval(hopTraits.Traits.Interval())
3349+
.Delay(hopTraits.Traits.Delay())
3350+
.DataWatermarks(hopTraits.Traits.DataWatermarks())
3351+
.InitHandler(initLambda)
3352+
.UpdateHandler(updateLambda)
3353+
.MergeHandler(mergeLambda)
3354+
.FinishHandler(finishLambda)
3355+
.SaveHandler(saveLambda)
3356+
.LoadHandler(loadLambda)
3357+
.template WatermarkMode<TCoAtom>().Build(ToString(false));
3358+
3359+
return Build<TCoPartitionsByKeys>(ctx, pos)
3360+
.Input(aggregate.Input())
3361+
.KeySelectorLambda(keyLambda)
3362+
.SortDirections<TCoBool>()
3363+
.Literal()
3364+
.Value("true")
3365+
.Build()
3366+
.Build()
3367+
.SortKeySelectorLambda(timeExtractorLambda)
3368+
.ListHandlerLambda()
3369+
.Args(streamArg)
3370+
.template Body<TCoForwardList>()
3371+
.Stream(multiHoppingCoreBuilder
3372+
.template Input<TCoIterator>()
3373+
.List(streamArg)
3374+
.Build()
3375+
.Done())
3376+
.Build()
3377+
.Build()
3378+
.Done()
3379+
.Ptr();
3380+
}
3381+
3382+
TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
3383+
const auto aggregate = TCoAggregate(node);
3384+
3385+
if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) {
3386+
return nullptr;
3387+
}
3388+
3389+
if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
3390+
return nullptr;
3391+
}
3392+
3393+
auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
3394+
if (!result) {
3395+
return result;
3396+
}
3397+
3398+
auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
3399+
if (!outputColumnSetting) {
3400+
return result;
3401+
}
3402+
3403+
return Build<TCoExtractMembers>(ctx, aggregate.Pos())
3404+
.Input(result)
3405+
.Members(outputColumnSetting->ChildPtr(1))
3406+
.Done()
3407+
.Ptr();
3408+
}
3409+
33043410
TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
33053411
TVector<ui32> withAssume;
33063412
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
@@ -5080,6 +5186,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
50805186
return clean;
50815187
}
50825188

5189+
if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) {
5190+
return hopping;
5191+
}
5192+
50835193
return DropReorder<false>(node, ctx);
50845194
};
50855195

ydb/library/yql/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SRCS(
2828
yql_join.cpp
2929
yql_join.h
3030
yql_library_compiler.cpp
31+
yql_opt_hopping.cpp
3132
yql_opt_match_recognize.cpp
3233
yql_opt_match_recognize.h
3334
yql_opt_proposed_by_data.cpp

ydb/library/yql/core/yql_expr_constraint.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
244244
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
245245
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
246246
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
247+
Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap;
247248
}
248249

249250
std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
@@ -2920,6 +2921,23 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
29202921

29212922
return TStatus::Ok;
29222923
}
2924+
2925+
TStatus MultiHoppingCoreWrap(const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) const {
2926+
if (const auto status = UpdateAllChildLambdasConstraints(*input); status != TStatus::Ok) {
2927+
return status;
2928+
}
2929+
2930+
TExprNode::TPtr keySelectorLambda = input->Child(TCoMultiHoppingCore::idx_KeyExtractor);
2931+
std::vector<std::string_view> columns;
2932+
ExtractKeys(*keySelectorLambda, columns);
2933+
if (!columns.empty()) {
2934+
input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(columns));
2935+
input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(columns));
2936+
}
2937+
2938+
return TStatus::Ok;
2939+
}
2940+
29232941
private:
29242942
template <class TConstraintContainer>
29252943
static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet<TStringBuf>& except) {
@@ -2939,7 +2957,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
29392957
}
29402958
}
29412959

2942-
static void ExtractKeys(const TExprNode& keySelectorLambda, TVector<TStringBuf>& columns) {
2960+
static void ExtractKeys(const TExprNode& keySelectorLambda, std::vector<std::string_view>& columns) {
29432961
const auto arg = keySelectorLambda.Head().Child(0);
29442962
auto body = keySelectorLambda.Child(1);
29452963
if (body->IsCallable("StablePickle")) {

0 commit comments

Comments
 (0)