Skip to content

Commit 90166bc

Browse files
authored
Make DQ and KQP use both WideStream and WideFlow for WideFromBlocks (#12691)
1 parent 3d0c4ef commit 90166bc

File tree

6 files changed

+182
-56
lines changed

6 files changed

+182
-56
lines changed

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,24 @@ bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TE
267267
return resolveStatus == IArrowResolver::OK;
268268
}
269269

270+
// XXX: Unfortunately, KQP optimizer pipeline is not quite
271+
// predictable, so there is no guarantees whether the pair
272+
// (FromFlow(ToFlow(...)) are already squashed or not.
273+
// The helper below checks both cases to find WideFromBlocks
274+
// input node:
275+
// * (WideFromBlocks(...))
276+
// * (FromFlow(ToFlow(WideFromBlocks(...))))
277+
// FIXME: To suppress compiler warnings when the constexpr value
278+
// NYql::NBlockStreamIO::WideFromBlocks is false, Y_DECLARE_UNUSED
279+
// quantifier is used.
280+
Y_DECLARE_UNUSED
281+
static inline bool IsNodeWideFromBlocks(const TMaybeNode<TExprBase> body) {
282+
return body.Maybe<TCoWideFromBlocks>() ||
283+
body.Maybe<TCoFromFlow>() &&
284+
body.Cast<TCoFromFlow>().Input().Maybe<TCoToFlow>() &&
285+
body.Cast<TCoFromFlow>().Input().Cast<TCoToFlow>().Input().Maybe<TCoWideFromBlocks>();
286+
}
287+
270288
// TODO: composite copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L388
271289
bool CanPropagateWideBlockThroughChannel(
272290
const TDqOutput& output,
@@ -293,11 +311,20 @@ bool CanPropagateWideBlockThroughChannel(
293311
return false;
294312
}
295313

296-
// Ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
297-
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
298-
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
299-
{
300-
return false;
314+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
315+
// Ensure that stage has blocks on top level (i.e. (FromFlow(WideFromBlocks(...)))).
316+
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
317+
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
318+
{
319+
return false;
320+
}
321+
} else {
322+
// Ensure that stage has blocks on top level (i.e. either
323+
// (WideFromBlocks(...)) or (FromFlow(ToFlow(WideFromBlocks(...))))).
324+
// See the rationale for alternatives nearby IsNodeWideFromBlocks.
325+
if (!IsNodeWideFromBlocks(program.Lambda().Body())) {
326+
return false;
327+
}
301328
}
302329

303330
auto typeAnnotation = program.Lambda().Ref().GetTypeAnn();
@@ -371,31 +398,61 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
371398
if (auto connection = stage.Inputs().Item(i).Maybe<TDqConnection>(); connection &&
372399
CanPropagateWideBlockThroughChannel(connection.Cast().Output(), programs, TDqStageSettings::Parse(stage), ctx, typesCtx))
373400
{
374-
TExprNode::TPtr newArgNode = ctx.Builder(oldArg.Pos())
375-
.Callable("FromFlow")
376-
.Callable(0, "WideFromBlocks")
377-
.Callable(0, "ToFlow")
378-
.Add(0, newArg.Ptr())
401+
TExprNode::TPtr newArgNode;
402+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
403+
newArgNode = ctx.Builder(oldArg.Pos())
404+
.Callable("FromFlow")
405+
.Callable(0, "WideFromBlocks")
406+
.Callable(0, "ToFlow")
407+
.Add(0, newArg.Ptr())
408+
.Seal()
379409
.Seal()
380410
.Seal()
381-
.Seal()
382-
.Build();
411+
.Build();
412+
} else {
413+
newArgNode = ctx.Builder(oldArg.Pos())
414+
.Callable("WideFromBlocks")
415+
.Add(0, newArg.Ptr())
416+
.Seal()
417+
.Build();
418+
}
419+
383420
argsMap.emplace(oldArg.Raw(), newArgNode);
384421

385422
auto stageUid = connection.Cast().Output().Stage().Ref().UniqueId();
386423

387-
// Update input program with: FromFlow(WideFromBlocks($1)) → FromFlow($1)
388-
if (const auto& inputProgram = programs.at(stageUid); inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
389-
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
390-
{
391-
auto newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
392-
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
393-
.Done();
424+
const auto& inputProgram = programs.at(stageUid);
425+
TMaybeNode<TExprBase> newBody;
426+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
427+
// Update input program with: (FromFlow(WideFromBlocks($1))) -> (FromFlow($1))
428+
if (inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
429+
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
430+
{
431+
newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
432+
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
433+
.Done();
434+
}
435+
} else {
436+
// Update input program with one of the following:
437+
// * (WideFromBlocks($1)) -> ($1)
438+
// * (FromFlow(ToFlow(WideFromBlocks($1)))) -> ($1)
439+
const auto& body = inputProgram.Lambda().Body();
440+
if (IsNodeWideFromBlocks(body)) {
441+
if (body.Maybe<TCoWideFromBlocks>()) {
442+
newBody = body.Cast<TCoWideFromBlocks>().Input();
443+
} else {
444+
newBody = body.Cast<TCoFromFlow>().Input()
445+
.Cast<TCoToFlow>().Input()
446+
.Cast<TCoWideFromBlocks>().Input();
447+
}
448+
}
449+
}
394450

451+
if (newBody) {
395452
auto newInputProgram = Build<TKqpProgram>(ctx, inputProgram.Pos())
396453
.Lambda()
397454
.Args(inputProgram.Lambda().Args())
398-
.Body(newBody)
455+
.Body(newBody.Cast())
399456
.Build()
400457
.ArgsType(inputProgram.ArgsType())
401458
.Done();

ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,33 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
5959
auto read = maybeRead.Cast();
6060

6161
if (typesCtx.IsBlockEngineEnabled()) {
62-
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
63-
.Input<TKqpBlockReadOlapTableRanges>()
64-
.Table(read.Table())
65-
.Ranges(read.Ranges())
66-
.Columns(read.Columns())
67-
.Settings(read.Settings())
68-
.ExplainPrompt(read.ExplainPrompt())
69-
.Process(read.Process())
70-
.Build()
71-
.Done();
62+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
63+
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
64+
.Input<TKqpBlockReadOlapTableRanges>()
65+
.Table(read.Table())
66+
.Ranges(read.Ranges())
67+
.Columns(read.Columns())
68+
.Settings(read.Settings())
69+
.ExplainPrompt(read.ExplainPrompt())
70+
.Process(read.Process())
71+
.Build()
72+
.Done();
73+
} else {
74+
wideRead = Build<TCoToFlow>(ctx, node.Pos())
75+
.Input<TCoWideFromBlocks>()
76+
.Input<TCoFromFlow>()
77+
.Input<TKqpBlockReadOlapTableRanges>()
78+
.Table(read.Table())
79+
.Ranges(read.Ranges())
80+
.Columns(read.Columns())
81+
.Settings(read.Settings())
82+
.ExplainPrompt(read.ExplainPrompt())
83+
.Process(read.Process())
84+
.Build()
85+
.Build()
86+
.Build()
87+
.Done();
88+
}
7289
} else {
7390
wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos())
7491
.Table(read.Table())

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2536,16 +2536,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
25362536

25372537
switch (blockChannelsMode) {
25382538
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_SCALAR:
2539-
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2539+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
2540+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2541+
} else {
2542+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (ToFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2543+
}
25402544
break;
25412545
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_AUTO:
2542-
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2546+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
2547+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2548+
} else {
2549+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(WideFromBlocks"), plan.QueryStats->Getquery_ast());
2550+
}
25432551
UNIT_ASSERT_C(!plan.QueryStats->Getquery_ast().Contains("WideToBlocks"), plan.QueryStats->Getquery_ast());
25442552
UNIT_ASSERT_EQUAL_C(plan.QueryStats->Getquery_ast().find("WideFromBlocks"), plan.QueryStats->Getquery_ast().rfind("WideFromBlocks"), plan.QueryStats->Getquery_ast());
25452553
break;
25462554
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_FORCE:
25472555
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideSortBlocks"), plan.QueryStats->Getquery_ast());
2548-
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2556+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
2557+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2558+
} else {
2559+
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (ToFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
2560+
}
25492561
break;
25502562
}
25512563
}

ydb/library/yql/dq/opt/dq_opt_build.cpp

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -768,10 +768,16 @@ bool CanRebuildForWideBlockChannelOutput(bool forceBlocks, const TDqPhyStage& st
768768

769769
if (!forceBlocks) {
770770
// ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
771-
if (!stage.Program().Body().Maybe<TCoFromFlow>() ||
772-
!stage.Program().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
773-
{
774-
return false;
771+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
772+
if (!stage.Program().Body().Maybe<TCoFromFlow>() ||
773+
!stage.Program().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
774+
{
775+
return false;
776+
}
777+
} else {
778+
if (!stage.Program().Body().Maybe<TCoWideFromBlocks>()) {
779+
return false;
780+
}
775781
}
776782
}
777783

@@ -826,15 +832,24 @@ TDqPhyStage RebuildStageInputsAsWideBlock(bool forceBlocks, const TDqPhyStage& s
826832
if (maybeConn && IsSupportedForWideBlocks(maybeConn.Cast()) && CanRebuildForWideBlockChannelOutput(forceBlocks, maybeConn.Cast().Output(), ctx, typesCtx)) {
827833
++blockInputs;
828834
// input will actually be wide block stream - convert it to wide stream first
829-
TExprNode::TPtr newArgNode = ctx.Builder(arg.Pos())
830-
.Callable("FromFlow")
831-
.Callable(0, "WideFromBlocks")
832-
.Callable(0, "ToFlow")
833-
.Add(0, newArg.Ptr())
835+
TExprNode::TPtr newArgNode;
836+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
837+
newArgNode = ctx.Builder(arg.Pos())
838+
.Callable("FromFlow")
839+
.Callable(0, "WideFromBlocks")
840+
.Callable(0, "ToFlow")
841+
.Add(0, newArg.Ptr())
842+
.Seal()
834843
.Seal()
835844
.Seal()
836-
.Seal()
837-
.Build();
845+
.Build();
846+
} else {
847+
newArgNode = ctx.Builder(arg.Pos())
848+
.Callable("WideFromBlocks")
849+
.Add(0, newArg.Ptr())
850+
.Seal()
851+
.Build();
852+
}
838853
argsMap.emplace(arg.Raw(), newArgNode);
839854

840855
const TDqConnection& conn = maybeConn.Cast();

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,11 +2946,23 @@ NNodes::TExprBase DqBuildStageWithSourceWrap(NNodes::TExprBase node, TExprContex
29462946
.Build();
29472947

29482948
if (supportsBlocks) {
2949-
wideWrap = ctx.Builder(node.Pos())
2950-
.Callable("WideFromBlocks")
2951-
.Add(0, wideWrap)
2952-
.Seal()
2953-
.Build();
2949+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
2950+
wideWrap = ctx.Builder(node.Pos())
2951+
.Callable("WideFromBlocks")
2952+
.Add(0, wideWrap)
2953+
.Seal()
2954+
.Build();
2955+
} else {
2956+
wideWrap = ctx.Builder(node.Pos())
2957+
.Callable("ToFlow")
2958+
.Callable(0, "WideFromBlocks")
2959+
.Callable(0, "FromFlow")
2960+
.Add(0, wideWrap)
2961+
.Seal()
2962+
.Seal()
2963+
.Seal()
2964+
.Build();
2965+
}
29542966
}
29552967

29562968
auto narrow = ctx.Builder(node.Pos())

ydb/library/yql/providers/dq/opt/dqs_opt.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,29 @@ namespace NYql::NDqs {
9494
}
9595

9696
YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration";
97-
return Build<TCoWideFromBlocks>(ctx, node->Pos())
98-
.Input(
99-
Build<TCoToFlow>(ctx, node->Pos())
97+
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
98+
return Build<TCoWideFromBlocks>(ctx, node->Pos())
99+
.Input(Build<TCoToFlow>(ctx, node->Pos())
100100
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
101-
.Input(readWideWrap.Input())
102-
.Flags(readWideWrap.Flags())
103-
.Token(readWideWrap.Token())
104-
.Done().Ptr())
101+
.Input(readWideWrap.Input())
102+
.Flags(readWideWrap.Flags())
103+
.Token(readWideWrap.Token())
104+
.Done().Ptr())
105105
.Done())
106106
.Done().Ptr();
107+
}
108+
109+
YQL_ENSURE(NYql::NBlockStreamIO::WideFromBlocks);
110+
111+
return Build<TCoToFlow>(ctx, node->Pos())
112+
.Input(Build<TCoWideFromBlocks>(ctx, node->Pos())
113+
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
114+
.Input(readWideWrap.Input())
115+
.Flags(readWideWrap.Flags())
116+
.Token(readWideWrap.Token())
117+
.Done().Ptr())
118+
.Done())
119+
.Done().Ptr();
107120
}, ctx, optSettings);
108121
});
109122
}

0 commit comments

Comments
 (0)