Skip to content

Commit a7b411f

Browse files
authored
Merge 6315ad7 into db1f75f
2 parents db1f75f + 6315ad7 commit a7b411f

File tree

6 files changed

+199
-29
lines changed

6 files changed

+199
-29
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,16 @@ std::pair<
522522
} else {
523523
result[i] = { EOutputRowItemSource::LookupOther, lookupPayloadColumns.at(name) };
524524
}
525+
} else if (leftLabel.empty()) {
526+
const auto name = prefixedName;
527+
if (auto j = leftJoinColumns.FindPtr(name)) {
528+
result[i] = { EOutputRowItemSource::InputKey, lookupKeyColumns.at(rightNames[*j]) };
529+
} else if (auto k = inputColumns.FindPtr(name)) {
530+
result[i] = { EOutputRowItemSource::InputOther, otherInputIndexes.size() };
531+
otherInputIndexes.push_back(*k);
532+
} else {
533+
Y_ABORT();
534+
}
525535
} else {
526536
Y_ABORT();
527537
}

ydb/library/yql/dq/type_ann/dq_type_ann.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,21 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
625625
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
626626
return TStatus::Error;
627627
}
628+
auto rightInput = cnStreamLookup.RightInput();
629+
if (!rightInput.Raw()->IsCallable("TDqLookupSourceWrap")) {
630+
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), "Unknown DqCnStreamLookup RightInput type (must be TDqLookupSourceWrap)"));
631+
return TStatus::Error;
632+
}
628633
const auto leftRowType = GetSeqItemType(leftInputType);
629-
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
634+
if (!leftRowType) {
635+
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), "DqCnStreamLookup: Failed to annotate left row type"));
636+
return TStatus::Error;
637+
}
638+
const auto rightRowType = GetSeqItemType(rightInput.Raw()->GetTypeAnn());
639+
if (!rightRowType) {
640+
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), "DqCnStreamLookup: Failed to annotate right row type"));
641+
return TStatus::Error;
642+
}
630643
const auto outputRowType = GetDqJoinResultType<true>(
631644
input->Pos(),
632645
*leftRowType->Cast<TStructExprType>(),
@@ -637,6 +650,10 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
637650
cnStreamLookup.JoinKeys(),
638651
ctx
639652
);
653+
if (!outputRowType) {
654+
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), "DqCnStreamLookup: Failed to annotate output row type"));
655+
return TStatus::Error;
656+
}
640657
if (!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) ||
641658
!EnsureConvertibleTo<ui64>(cnStreamLookup.TTL().Ref(), "TTL", ctx) ||
642659
!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) {

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -225,36 +225,64 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
225225
return TDqLookupSourceWrap(lookupSourceWrap);
226226
}
227227

228-
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
229-
Y_UNUSED(ctx);
230-
const auto equiJoin = node.Cast<TCoEquiJoin>();
231-
if (equiJoin.ArgCount() != 4) { // 2 parties join
232-
return node;
228+
// Recursively walk join tree and replace right-side of StreamLookupJoin
229+
ui32 RewriteStreamJoinTuple(ui32 idx, const TCoEquiJoin& equiJoin, const TCoEquiJoinTuple& joinTuple, std::vector<TExprNode::TPtr>& args, TExprContext& ctx, bool& changed) {
230+
// recursion depth O(args.size())
231+
Y_ENSURE(idx < args.size());
232+
// handle left side
233+
if (!joinTuple.LeftScope().Maybe<TCoAtom>()) {
234+
idx = RewriteStreamJoinTuple(idx, equiJoin, joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
235+
} else {
236+
++idx;
237+
}
238+
// handle right side
239+
if (!joinTuple.RightScope().Maybe<TCoAtom>()) {
240+
return RewriteStreamJoinTuple(idx, equiJoin, joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
233241
}
234-
const auto left = equiJoin.Arg(0).Cast<TCoEquiJoinInput>().List();
235-
const auto right = equiJoin.Arg(1).Cast<TCoEquiJoinInput>().List();
236-
const auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
242+
Y_ENSURE(idx < args.size());
237243
if (!IsStreamLookup(joinTuple)) {
238-
return node;
244+
return idx + 1;
239245
}
240-
if (!right.Maybe<TDqSourceWrap>() && !right.Maybe<TDqReadWrap>()) {
241-
return node;
246+
auto right = equiJoin.Arg(idx).Cast<TCoEquiJoinInput>();
247+
auto rightList = right.List();
248+
if (auto maybeExtractMembers = rightList.Maybe<TCoExtractMembers>()) {
249+
rightList = maybeExtractMembers.Cast().Input();
242250
}
251+
TExprNode::TPtr lookupSourceWrap;
252+
if (auto maybeSource = rightList.Maybe<TDqSourceWrap>()) {
253+
lookupSourceWrap = LookupSourceFromSource(maybeSource.Cast(), ctx).Ptr();
254+
} else if (auto maybeRead = rightList.Maybe<TDqReadWrap>()) {
255+
lookupSourceWrap = LookupSourceFromRead(maybeRead.Cast(), ctx).Ptr();
256+
} else {
257+
return idx + 1;
258+
}
259+
changed = true;
260+
args[idx] =
261+
Build<TCoEquiJoinInput>(ctx, joinTuple.Pos())
262+
.List(lookupSourceWrap)
263+
.Scope(right.Scope())
264+
.Done().Ptr();
265+
return idx + 1;
266+
}
243267

244-
TDqLookupSourceWrap lookupSourceWrap = right.Maybe<TDqSourceWrap>()
245-
? LookupSourceFromSource(right.Cast<TDqSourceWrap>(), ctx)
246-
: LookupSourceFromRead(right.Cast<TDqReadWrap>(), ctx)
247-
;
248-
249-
return Build<TCoEquiJoin>(ctx, node.Pos())
250-
.Add(equiJoin.Arg(0))
251-
.Add<TCoEquiJoinInput>()
252-
.List(lookupSourceWrap)
253-
.Scope(equiJoin.Arg(1).Cast<TCoEquiJoinInput>().Scope())
254-
.Build()
255-
.Add(equiJoin.Arg(2))
256-
.Add(equiJoin.Arg(3))
257-
.Done();
268+
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
269+
const auto equiJoin = node.Cast<TCoEquiJoin>();
270+
auto argCount = equiJoin.ArgCount();
271+
const auto joinTuple = equiJoin.Arg(argCount - 2).Cast<TCoEquiJoinTuple>();
272+
std::vector<TExprNode::TPtr> args(argCount);
273+
bool changed = false;
274+
auto rightIdx = RewriteStreamJoinTuple(0u, equiJoin, joinTuple, args, ctx, changed);
275+
Y_ENSURE(rightIdx + 2 == argCount);
276+
if (!changed) {
277+
return node;
278+
}
279+
// fill copies of remaining args
280+
for (ui32 i = 0; i < argCount; ++i) {
281+
if (!args[i]) {
282+
args[i] = equiJoin.Arg(i).Ptr();
283+
}
284+
}
285+
return Build<TCoEquiJoin>(ctx, node.Pos()).Add(std::move(args)).Done();
258286
}
259287

260288
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,16 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
312312
if (!maxDelayedRows) {
313313
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
314314
}
315+
auto rightInput = join.RightInput().Ptr();
316+
if (auto maybe = TExprBase(rightInput).Maybe<TCoExtractMembers>()) {
317+
rightInput = maybe.Cast().Input().Ptr();
318+
}
319+
auto leftLabel = join.LeftLabel().Maybe<NNodes::TCoAtom>() ? join.LeftLabel().Cast<NNodes::TCoAtom>().Ptr() : ctx.NewAtom(pos, "");
320+
Y_ENSURE(join.RightLabel().Maybe<NNodes::TCoAtom>());
315321
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
316322
.Output(left.Output().Cast())
317-
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
318-
.RightInput(join.RightInput())
323+
.LeftLabel(leftLabel)
324+
.RightInput(rightInput)
319325
.RightLabel(join.RightLabel().Cast<NNodes::TCoAtom>())
320326
.JoinKeys(join.JoinKeys())
321327
.JoinType(join.JoinType())

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ namespace NYql::NDqs {
608608
settings.SetRightLabel(streamLookup.RightLabel().StringValue());
609609
settings.SetJoinType(streamLookup.JoinType().StringValue());
610610
for (const auto& k: streamLookup.LeftJoinKeyNames()) {
611-
*settings.AddLeftJoinKeyNames() = RemoveAliases(k.StringValue());
611+
*settings.AddLeftJoinKeyNames() = streamLookup.LeftLabel().StringValue().empty() ? k.StringValue() : RemoveAliases(k.StringValue());
612612
}
613613
for (const auto& k: streamLookup.RightJoinKeyNames()) {
614614
*settings.AddRightJoinKeyNames() = RemoveAliases(k.StringValue());

ydb/tests/fq/generic/streaming/test_join.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,115 @@ def freeze(json):
412412
]
413413
),
414414
),
415+
# 8
416+
(
417+
R'''
418+
$input = SELECT * FROM myyds.`{input_topic}`
419+
WITH (
420+
FORMAT=json_each_row,
421+
SCHEMA (
422+
za Int32,
423+
yb STRING,
424+
yc Int32,
425+
zd Int32,
426+
)
427+
) ;
428+
429+
$enriched1 = select a, b, c, d, e, f, za, yb, yc, zd
430+
from
431+
$input as e
432+
left join {streamlookup} any ydb_conn_{table_name}.db as u
433+
on(e.za = u.a AND e.yb = u.b)
434+
;
435+
436+
$enriched2 = SELECT e.a AS a, e.b AS b, e.c AS c, e.d AS d, e.e AS e, e.f AS f, za, yb, yc, zd, u.c AS c2, u.d AS d2
437+
from
438+
$enriched1 as e
439+
left join {streamlookup} any ydb_conn_{table_name}.db as u
440+
on(e.za = u.a AND e.yb = u.b)
441+
;
442+
443+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
444+
from
445+
$enriched2 as e
446+
;
447+
448+
insert into myyds.`{output_topic}`
449+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
450+
''',
451+
ResequenceId(
452+
[
453+
(
454+
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
455+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
456+
),
457+
(
458+
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
459+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
460+
),
461+
(
462+
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
463+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
464+
),
465+
(
466+
'{"id":3,"za":null,"yb":"1","yc":114,"zd":115}',
467+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
468+
),
469+
]
470+
),
471+
),
472+
# 9
473+
(
474+
R'''
475+
$input = SELECT * FROM myyds.`{input_topic}`
476+
WITH (
477+
FORMAT=json_each_row,
478+
SCHEMA (
479+
za Int32,
480+
yb STRING,
481+
yc Int32,
482+
zd Int32,
483+
)
484+
) ;
485+
486+
$enriched12 = select u.a as a, u.b as b, u.c as c, u.d as d, u.e as e, u.f as f, za, yb, yc, zd, u2.c as c2, u2.d as d2
487+
from
488+
$input as e
489+
left join {streamlookup} any ydb_conn_{table_name}.db as u
490+
on(e.za = u.a AND e.yb = u.b)
491+
left join {streamlookup} any ydb_conn_{table_name}.db as u2
492+
on(e.yb = u2.b AND e.za = u2.a)
493+
;
494+
495+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
496+
from
497+
$enriched12 as e
498+
;
499+
500+
insert into myyds.`{output_topic}`
501+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
502+
''',
503+
ResequenceId(
504+
[
505+
(
506+
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
507+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
508+
),
509+
(
510+
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
511+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
512+
),
513+
(
514+
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
515+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
516+
),
517+
(
518+
'{"id":3,"za":null,"yb":"1","yc":114,"zd":115}',
519+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
520+
),
521+
]
522+
),
523+
),
415524
]
416525

417526

0 commit comments

Comments
 (0)