Skip to content

Commit 38d0188

Browse files
authored
Fix multiple streamlookup (ydb-platform#13426)
1 parent bd79c91 commit 38d0188

File tree

7 files changed

+251
-33
lines changed

7 files changed

+251
-33
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,16 @@ std::pair<
522522
} else {
523523
result[i] = { EOutputRowItemSource::LookupOther, lookupPayloadColumns.at(name) };
524524
}
525+
} else if (leftLabel.empty()) {
526+
const auto name = prefixedName;
527+
if (auto j = leftJoinColumns.FindPtr(name)) {
528+
result[i] = { EOutputRowItemSource::InputKey, lookupKeyColumns.at(rightNames[*j]) };
529+
} else if (auto k = inputColumns.FindPtr(name)) {
530+
result[i] = { EOutputRowItemSource::InputOther, otherInputIndexes.size() };
531+
otherInputIndexes.push_back(*k);
532+
} else {
533+
Y_ABORT();
534+
}
525535
} else {
526536
Y_ABORT();
527537
}

ydb/library/yql/dq/type_ann/dq_type_ann.cpp

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,56 @@ TStatus AnnotateDqConnection(const TExprNode::TPtr& input, TExprContext& ctx) {
616616
}
617617

618618
TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx) {
619+
if (!EnsureArgsCount(*input, 11, ctx)) {
620+
return TStatus::Error;
621+
}
622+
if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_Output), ctx)) {
623+
return TStatus::Error;
624+
}
625+
if (!TDqOutput::Match(input->Child(TDqCnStreamLookup::idx_Output))) {
626+
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TDqCnStreamLookup::idx_Output)->Pos()), TStringBuilder() << "Expected " << TDqOutput::CallableName()));
627+
return TStatus::Error;
628+
}
629+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_LeftLabel), ctx)) {
630+
return TStatus::Error;
631+
}
632+
if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_RightInput), ctx)) {
633+
return TStatus::Error;
634+
}
635+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_RightLabel), ctx)) {
636+
return TStatus::Error;
637+
}
638+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_JoinType), ctx)) {
639+
return TStatus::Error;
640+
}
641+
if (!EnsureTuple(*input->Child(TDqCnStreamLookup::idx_JoinKeys), ctx)) {
642+
return TStatus::Error;
643+
}
644+
for (auto& child: input->Child(TDqCnStreamLookup::idx_JoinKeys)->Children()) {
645+
if (!EnsureTupleSize(*child, 4, ctx)) {
646+
return TStatus::Error;
647+
}
648+
for (auto& subChild: child->Children()) {
649+
if (!EnsureAtom(*subChild, ctx)) {
650+
return TStatus::Error;
651+
}
652+
}
653+
}
654+
if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_LeftJoinKeyNames), ctx)) {
655+
return TStatus::Error;
656+
}
657+
if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_RightJoinKeyNames), ctx)) {
658+
return TStatus::Error;
659+
}
660+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_TTL), ctx)) {
661+
return TStatus::Error;
662+
}
663+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxDelayedRows), ctx)) {
664+
return TStatus::Error;
665+
}
666+
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxCachedRows), ctx)) {
667+
return TStatus::Error;
668+
}
619669
auto cnStreamLookup = TDqCnStreamLookup(input);
620670
auto leftInputType = GetDqConnectionType(TDqConnection(input), ctx);
621671
if (!leftInputType) {
@@ -625,18 +675,33 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
625675
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
626676
return TStatus::Error;
627677
}
628-
const auto leftRowType = GetSeqItemType(leftInputType);
629-
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
678+
auto rightInput = cnStreamLookup.RightInput();
679+
if (!rightInput.Raw()->IsCallable("TDqLookupSourceWrap")) {
680+
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), TStringBuilder() << "DqCnStreamLookup: RightInput: Expected TDqLookupSourceWrap, but got " << rightInput.Raw()->Content()));
681+
return TStatus::Error;
682+
}
683+
const auto& leftRowType = GetSeqItemType(*leftInputType);
684+
if (!EnsureStructType(input->Pos(), leftRowType, ctx)) {
685+
return TStatus::Error;
686+
}
687+
const auto rightInputType = rightInput.Raw()->GetTypeAnn();
688+
const auto& rightRowType = GetSeqItemType(*rightInputType);
689+
if (!EnsureStructType(input->Pos(), rightRowType, ctx)) {
690+
return TStatus::Error;
691+
}
630692
const auto outputRowType = GetDqJoinResultType<true>(
631693
input->Pos(),
632-
*leftRowType->Cast<TStructExprType>(),
694+
*leftRowType.Cast<TStructExprType>(),
633695
cnStreamLookup.LeftLabel().Cast<TCoAtom>().StringValue(),
634-
*rightRowType->Cast<TStructExprType>(),
696+
*rightRowType.Cast<TStructExprType>(),
635697
cnStreamLookup.RightLabel().StringValue(),
636698
cnStreamLookup.JoinType().StringValue(),
637699
cnStreamLookup.JoinKeys(),
638700
ctx
639701
);
702+
if (!outputRowType) {
703+
return TStatus::Error;
704+
}
640705
if (!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) ||
641706
!EnsureConvertibleTo<ui64>(cnStreamLookup.TTL().Ref(), "TTL", ctx) ||
642707
!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) {

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -225,36 +225,64 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
225225
return TDqLookupSourceWrap(lookupSourceWrap);
226226
}
227227

228-
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
229-
Y_UNUSED(ctx);
230-
const auto equiJoin = node.Cast<TCoEquiJoin>();
231-
if (equiJoin.ArgCount() != 4) { // 2 parties join
232-
return node;
228+
// Recursively walk join tree and replace right-side of StreamLookupJoin
229+
ui32 RewriteStreamJoinTuple(ui32 idx, const TCoEquiJoin& equiJoin, const TCoEquiJoinTuple& joinTuple, std::vector<TExprNode::TPtr>& args, TExprContext& ctx, bool& changed) {
230+
// recursion depth O(args.size())
231+
Y_ENSURE(idx < args.size());
232+
// handle left side
233+
if (!joinTuple.LeftScope().Maybe<TCoAtom>()) {
234+
idx = RewriteStreamJoinTuple(idx, equiJoin, joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
235+
} else {
236+
++idx;
237+
}
238+
// handle right side
239+
if (!joinTuple.RightScope().Maybe<TCoAtom>()) {
240+
return RewriteStreamJoinTuple(idx, equiJoin, joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
233241
}
234-
const auto left = equiJoin.Arg(0).Cast<TCoEquiJoinInput>().List();
235-
const auto right = equiJoin.Arg(1).Cast<TCoEquiJoinInput>().List();
236-
const auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
242+
Y_ENSURE(idx < args.size());
237243
if (!IsStreamLookup(joinTuple)) {
238-
return node;
244+
return idx + 1;
239245
}
240-
if (!right.Maybe<TDqSourceWrap>() && !right.Maybe<TDqReadWrap>()) {
241-
return node;
246+
auto right = equiJoin.Arg(idx).Cast<TCoEquiJoinInput>();
247+
auto rightList = right.List();
248+
if (auto maybeExtractMembers = rightList.Maybe<TCoExtractMembers>()) {
249+
rightList = maybeExtractMembers.Cast().Input();
242250
}
251+
TExprNode::TPtr lookupSourceWrap;
252+
if (auto maybeSource = rightList.Maybe<TDqSourceWrap>()) {
253+
lookupSourceWrap = LookupSourceFromSource(maybeSource.Cast(), ctx).Ptr();
254+
} else if (auto maybeRead = rightList.Maybe<TDqReadWrap>()) {
255+
lookupSourceWrap = LookupSourceFromRead(maybeRead.Cast(), ctx).Ptr();
256+
} else {
257+
return idx + 1;
258+
}
259+
changed = true;
260+
args[idx] =
261+
Build<TCoEquiJoinInput>(ctx, joinTuple.Pos())
262+
.List(lookupSourceWrap)
263+
.Scope(right.Scope())
264+
.Done().Ptr();
265+
return idx + 1;
266+
}
243267

244-
TDqLookupSourceWrap lookupSourceWrap = right.Maybe<TDqSourceWrap>()
245-
? LookupSourceFromSource(right.Cast<TDqSourceWrap>(), ctx)
246-
: LookupSourceFromRead(right.Cast<TDqReadWrap>(), ctx)
247-
;
248-
249-
return Build<TCoEquiJoin>(ctx, node.Pos())
250-
.Add(equiJoin.Arg(0))
251-
.Add<TCoEquiJoinInput>()
252-
.List(lookupSourceWrap)
253-
.Scope(equiJoin.Arg(1).Cast<TCoEquiJoinInput>().Scope())
254-
.Build()
255-
.Add(equiJoin.Arg(2))
256-
.Add(equiJoin.Arg(3))
257-
.Done();
268+
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
269+
const auto equiJoin = node.Cast<TCoEquiJoin>();
270+
auto argCount = equiJoin.ArgCount();
271+
const auto joinTuple = equiJoin.Arg(argCount - 2).Cast<TCoEquiJoinTuple>();
272+
std::vector<TExprNode::TPtr> args(argCount);
273+
bool changed = false;
274+
auto rightIdx = RewriteStreamJoinTuple(0u, equiJoin, joinTuple, args, ctx, changed);
275+
Y_ENSURE(rightIdx + 2 == argCount);
276+
if (!changed) {
277+
return node;
278+
}
279+
// fill copies of remaining args
280+
for (ui32 i = 0; i < argCount; ++i) {
281+
if (!args[i]) {
282+
args[i] = equiJoin.Arg(i).Ptr();
283+
}
284+
}
285+
return Build<TCoEquiJoin>(ctx, node.Pos()).Add(std::move(args)).Done();
258286
}
259287

260288
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,16 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
312312
if (!maxDelayedRows) {
313313
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
314314
}
315+
auto rightInput = join.RightInput().Ptr();
316+
if (auto maybe = TExprBase(rightInput).Maybe<TCoExtractMembers>()) {
317+
rightInput = maybe.Cast().Input().Ptr();
318+
}
319+
auto leftLabel = join.LeftLabel().Maybe<NNodes::TCoAtom>() ? join.LeftLabel().Cast<NNodes::TCoAtom>().Ptr() : ctx.NewAtom(pos, "");
320+
Y_ENSURE(join.RightLabel().Maybe<NNodes::TCoAtom>());
315321
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
316322
.Output(left.Output().Cast())
317-
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
318-
.RightInput(join.RightInput())
323+
.LeftLabel(leftLabel)
324+
.RightInput(rightInput)
319325
.RightLabel(join.RightLabel().Cast<NNodes::TCoAtom>())
320326
.JoinKeys(join.JoinKeys())
321327
.JoinType(join.JoinType())

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ namespace NYql::NDqs {
608608
settings.SetRightLabel(streamLookup.RightLabel().StringValue());
609609
settings.SetJoinType(streamLookup.JoinType().StringValue());
610610
for (const auto& k: streamLookup.LeftJoinKeyNames()) {
611-
*settings.AddLeftJoinKeyNames() = RemoveAliases(k.StringValue());
611+
*settings.AddLeftJoinKeyNames() = streamLookup.LeftLabel().StringValue().empty() ? k.StringValue() : RemoveAliases(k.StringValue());
612612
}
613613
for (const auto& k: streamLookup.RightJoinKeyNames()) {
614614
*settings.AddRightJoinKeyNames() = RemoveAliases(k.StringValue());

ydb/tests/fq/generic/streaming/test_join.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,115 @@ def freeze(json):
412412
]
413413
),
414414
),
415+
# 8
416+
(
417+
R'''
418+
$input = SELECT * FROM myyds.`{input_topic}`
419+
WITH (
420+
FORMAT=json_each_row,
421+
SCHEMA (
422+
za Int32,
423+
yb STRING,
424+
yc Int32,
425+
zd Int32,
426+
)
427+
) ;
428+
429+
$enriched1 = select a, b, c, d, e, f, za, yb, yc, zd
430+
from
431+
$input as e
432+
left join {streamlookup} any ydb_conn_{table_name}.db as u
433+
on(e.za = u.a AND e.yb = u.b)
434+
;
435+
436+
$enriched2 = SELECT e.a AS a, e.b AS b, e.c AS c, e.d AS d, e.e AS e, e.f AS f, za, yb, yc, zd, u.c AS c2, u.d AS d2
437+
from
438+
$enriched1 as e
439+
left join {streamlookup} any ydb_conn_{table_name}.db as u
440+
on(e.za = u.a AND e.yb = u.b)
441+
;
442+
443+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
444+
from
445+
$enriched2 as e
446+
;
447+
448+
insert into myyds.`{output_topic}`
449+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
450+
''',
451+
ResequenceId(
452+
[
453+
(
454+
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
455+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
456+
),
457+
(
458+
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
459+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
460+
),
461+
(
462+
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
463+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
464+
),
465+
(
466+
'{"id":3,"za":null,"yb":"1","yc":114,"zd":115}',
467+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
468+
),
469+
]
470+
),
471+
),
472+
# 9
473+
(
474+
R'''
475+
$input = SELECT * FROM myyds.`{input_topic}`
476+
WITH (
477+
FORMAT=json_each_row,
478+
SCHEMA (
479+
a Int32,
480+
b STRING,
481+
c Int32,
482+
d Int32,
483+
)
484+
) ;
485+
486+
$enriched12 = select u.a as a, u.b as b, u.c as c, u.d as d, u.e as e, u.f as f, e.a as za, e.b as yb, e.c as yc, e.d as zd, u2.c as c2, u2.d as d2
487+
from
488+
$input as e
489+
left join {streamlookup} any ydb_conn_{table_name}.db as u
490+
on(e.a = u.a AND e.b = u.b)
491+
left join {streamlookup} any ydb_conn_{table_name}.db as u2
492+
on(e.b = u2.b AND e.a = u2.a)
493+
;
494+
495+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
496+
from
497+
$enriched12 as e
498+
;
499+
500+
insert into myyds.`{output_topic}`
501+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
502+
''',
503+
ResequenceId(
504+
[
505+
(
506+
'{"id":1,"a":1,"b":"2","c":100,"d":101}',
507+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
508+
),
509+
(
510+
'{"id":2,"a":7,"b":"8","c":106,"d":107}',
511+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
512+
),
513+
(
514+
'{"id":3,"a":2,"b":"1","c":114,"d":115}',
515+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
516+
),
517+
(
518+
'{"id":3,"a":null,"b":"1","c":114,"d":115}',
519+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
520+
),
521+
]
522+
),
523+
),
415524
]
416525

417526

ydb/tests/fq/generic/streaming/ydb/01_basic.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ set -ex
3535
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
3636
(18, 17, "ivalid ip", "newUser", 12);
3737
COMMIT;
38-
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a));
38+
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, g Int32, h Int32, PRIMARY KEY(b, a));
3939
COMMIT;
4040
INSERT INTO db (a, b, c, d, e, f) VALUES
4141
(1, "2", 3, 4, 5, 6),

0 commit comments

Comments
 (0)