Skip to content

Commit 7557e9e

Browse files
committed
remove code duplication
1 parent 8ae12f4 commit 7557e9e

File tree

4 files changed

+263
-774
lines changed

4 files changed

+263
-774
lines changed

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3302,6 +3302,111 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
33023302
return aggr.Ptr();
33033303
}
33043304

3305+
TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
3306+
const auto pos = aggregate.Pos();
3307+
3308+
NHopping::EnsureNotDistinct(aggregate);
3309+
3310+
const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
3311+
if (!maybeHopTraits) {
3312+
return nullptr;
3313+
}
3314+
const auto hopTraits = *maybeHopTraits;
3315+
3316+
const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
3317+
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);
3318+
3319+
// if (keysDescription.NeedPickle()) {
3320+
// return Build<TCoMap>(ctx, pos)
3321+
// .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
3322+
// .Input<TCoAggregate>()
3323+
// .InitFrom(aggregate)
3324+
// .Input<TCoMap>()
3325+
// .Lambda(keysDescription.BuildPickleLambda(ctx, pos))
3326+
// .Input(aggregate.Input())
3327+
// .Build()
3328+
// .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx))
3329+
// .Build()
3330+
// .Done()
3331+
// .Ptr();
3332+
// }
3333+
3334+
const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
3335+
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
3336+
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
3337+
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
3338+
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
3339+
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
3340+
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
3341+
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);
3342+
3343+
const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
3344+
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
3345+
.KeyExtractor(keyLambda)
3346+
.TimeExtractor(timeExtractorLambda)
3347+
.Hop(hopTraits.Traits.Hop())
3348+
.Interval(hopTraits.Traits.Interval())
3349+
.Delay(hopTraits.Traits.Delay())
3350+
.DataWatermarks(hopTraits.Traits.DataWatermarks())
3351+
.InitHandler(initLambda)
3352+
.UpdateHandler(updateLambda)
3353+
.MergeHandler(mergeLambda)
3354+
.FinishHandler(finishLambda)
3355+
.SaveHandler(saveLambda)
3356+
.LoadHandler(loadLambda)
3357+
.template WatermarkMode<TCoAtom>().Build(ToString(false));
3358+
3359+
return Build<TCoPartitionsByKeys>(ctx, pos)
3360+
.Input(aggregate.Input())
3361+
.KeySelectorLambda(keyLambda)
3362+
.SortDirections<TCoBool>()
3363+
.Literal()
3364+
.Value("true")
3365+
.Build()
3366+
.Build()
3367+
.SortKeySelectorLambda(timeExtractorLambda)
3368+
.ListHandlerLambda()
3369+
.Args(streamArg)
3370+
.template Body<TCoForwardList>()
3371+
.Stream(multiHoppingCoreBuilder
3372+
.template Input<TCoIterator>()
3373+
.List(streamArg)
3374+
.Build()
3375+
.Done())
3376+
.Build()
3377+
.Build()
3378+
.Done()
3379+
.Ptr();
3380+
}
3381+
3382+
TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
3383+
const auto aggregate = TCoAggregate(node);
3384+
3385+
if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) {
3386+
return nullptr;
3387+
}
3388+
3389+
if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
3390+
return nullptr;
3391+
}
3392+
3393+
auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
3394+
if (!result) {
3395+
return result;
3396+
}
3397+
3398+
auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
3399+
if (!outputColumnSetting) {
3400+
return result;
3401+
}
3402+
3403+
return Build<TCoExtractMembers>(ctx, aggregate.Pos())
3404+
.Input(result)
3405+
.Members(outputColumnSetting->ChildPtr(1))
3406+
.Done()
3407+
.Ptr();
3408+
}
3409+
33053410
TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
33063411
TVector<ui32> withAssume;
33073412
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
@@ -5081,7 +5186,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
50815186
return clean;
50825187
}
50835188

5084-
if (auto hopping = NHopping::RewriteAsHoppingWindow(node, ctx); hopping) {
5189+
if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) {
50855190
return hopping;
50865191
}
50875192

0 commit comments

Comments
 (0)