Skip to content

Commit cfe1520

Browse files
authored
fix DqReplicate expansion (#818)
1 parent af6fb5d commit cfe1520

File tree

1 file changed

+19
-1
lines changed

1 file changed

+19
-1
lines changed

ydb/library/yql/dq/opt/dq_opt_peephole.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,11 +607,29 @@ NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExp
607607

608608
TVector<TExprBase> branches;
609609
branches.reserve(dqReplicate.Args().Count() - 1);
610+
const auto inputKind = dqReplicate.Arg(0).Ref().GetTypeAnn()->GetKind();
611+
YQL_ENSURE(inputKind == ETypeAnnotationKind::Stream || inputKind == ETypeAnnotationKind::Flow);
610612

611613
auto inputIndex = NDq::BuildAtomList("0", dqReplicate.Pos(), ctx);
612614
for (size_t i = 1; i < dqReplicate.Args().Count(); ++i) {
613615
branches.emplace_back(inputIndex);
614-
branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Args().Get(i).Ref()));
616+
const auto lambdaOutputKind = dqReplicate.Arg(i).Ref().GetTypeAnn()->GetKind();
617+
YQL_ENSURE(lambdaOutputKind == ETypeAnnotationKind::Stream || lambdaOutputKind == ETypeAnnotationKind::Flow);
618+
if (lambdaOutputKind != inputKind) {
619+
branches.emplace_back(ctx.Builder(dqReplicate.Arg(i).Pos())
620+
.Lambda()
621+
.Param("arg")
622+
.Callable(lambdaOutputKind == ETypeAnnotationKind::Stream ? "ToFlow" : "FromFlow")
623+
.Apply(0, dqReplicate.Arg(i).Ptr())
624+
.With(0, "arg")
625+
.Seal()
626+
.Seal()
627+
.Seal()
628+
.Build()
629+
);
630+
} else {
631+
branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Arg(i).Ref()));
632+
}
615633
}
616634

617635
return Build<TCoSwitch>(ctx, dqReplicate.Pos())

0 commit comments

Comments
 (0)