Skip to content

Commit 8783e92

Browse files
yumkamyumkam7
andauthored
parse and pass streamlookup parameters (backport a079326 #12548) (#13095)
Co-authored-by: yumkam7 <yumkam7@yandex-team.com>
1 parent d9ef298 commit 8783e92

File tree

12 files changed

+183
-40
lines changed

12 files changed

+183
-40
lines changed

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
"Match": {"Type": "Callable", "Name": "DqJoin"},
4545
"Children": [
4646
{"Index": 8, "Name": "JoinAlgo", "Type": "TCoAtom"},
47-
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
47+
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true},
48+
{"Index": 10, "Name": "JoinAlgoOptions", "Type": "TCoNameValueTupleList", "Optional": true}
4849
]
4950
},
5051
{

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
242242
rightJoinKeyNames.emplace_back(rightColumnName);
243243
}
244244

245-
if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny)) {
245+
if ((linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin && (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode)) || !(leftAny || rightAny || !linkSettings.JoinAlgoOptions.empty())) {
246246
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
247247
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, leftAny))
248248
.LeftLabel(leftTableLabel)
@@ -266,6 +266,15 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
266266
if (rightAny)
267267
flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "RightAny", TNodeFlags::Default));
268268

269+
TVector<TCoNameValueTuple> joinAlgoOptions;
270+
for (ui32 i = 0; i + 1 < linkSettings.JoinAlgoOptions.size(); i += 2) {
271+
joinAlgoOptions.push_back(
272+
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
273+
.Name().Build(linkSettings.JoinAlgoOptions[i])
274+
.Value<TCoAtom>().Build(linkSettings.JoinAlgoOptions[i + 1])
275+
.Done());
276+
}
277+
269278
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
270279
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false))
271280
.LeftLabel(leftTableLabel)
@@ -280,9 +289,11 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
280289
.Add(rightJoinKeyNames)
281290
.Build()
282291
.JoinAlgo(joinAlgo)
283-
.Flags().Add(std::move(flags)).Build()
284-
.Done();
285-
return TJoinInputDesc(Nothing(), dqJoin, std::move(resultKeys));
292+
.Flags().Add(std::move(flags)).Build();
293+
if (!joinAlgoOptions.empty()) {
294+
dqJoin.JoinAlgoOptions().Add(std::move(joinAlgoOptions)).Build();
295+
}
296+
return TJoinInputDesc(Nothing(), dqJoin.Done(), std::move(resultKeys));
286297
}
287298
}
288299

ydb/library/yql/dq/type_ann/dq_type_ann.cpp

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include "dq_type_ann.h"
2-
32
#include <yql/essentials/core/yql_expr_type_annotation.h>
43
#include <yql/essentials/core/yql_join.h>
54
#include <yql/essentials/core/yql_opt_utils.h>
@@ -85,6 +84,16 @@ const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStruc
8584
return result;
8685
}
8786

87+
template <typename TType>
88+
bool EnsureConvertibleTo(const TExprNode& value, const TStringBuf name, TExprContext& ctx) {
89+
auto&& stringValue = value.Content();
90+
if (!TryFromString<TType>(stringValue)) {
91+
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << "Unsupported " << name << " value: " << stringValue));
92+
return false;
93+
}
94+
return true;
95+
}
96+
8897
template <typename TStage>
8998
TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
9099
if (!EnsureMinMaxArgsCount(*stage, 3, 4, ctx)) {
@@ -428,7 +437,7 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp
428437

429438
template <bool IsMapJoin>
430439
const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool stream, TExprContext& ctx) {
431-
if (!EnsureMinMaxArgsCount(*input, 8, 10, ctx)) {
440+
if (!EnsureMinMaxArgsCount(*input, 8, 11, ctx)) {
432441
return nullptr;
433442
}
434443

@@ -444,7 +453,8 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
444453
}
445454
}
446455

447-
if (!EnsureAtom(*input->Child(TDqJoin::idx_JoinType), ctx)) {
456+
const auto& joinType = *input->Child(TDqJoin::idx_JoinType);
457+
if (!EnsureAtom(joinType, ctx)) {
448458
return nullptr;
449459
}
450460

@@ -502,9 +512,39 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
502512
? join.RightLabel().Cast<TCoAtom>().Value()
503513
: TStringBuf("");
504514

505-
if (input->ChildrenSize() > 9U) {
506-
for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) {
507-
if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
515+
if (input->ChildrenSize() > TDqJoin::idx_JoinAlgoOptions) {
516+
const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo);
517+
if (!EnsureAtom(joinAlgo, ctx)) {
518+
return nullptr;
519+
}
520+
auto& joinAlgoOptions = *input->Child(TDqJoin::idx_JoinAlgoOptions);
521+
for (ui32 i = 0; i < joinAlgoOptions.ChildrenSize(); ++i) {
522+
auto& joinAlgoOption = *joinAlgoOptions.Child(i);
523+
if (!EnsureTupleOfAtoms(joinAlgoOption, ctx) || !EnsureTupleMinSize(joinAlgoOption, 1, ctx)) {
524+
return nullptr;
525+
}
526+
auto& name = *joinAlgoOption.Child(TCoNameValueTuple::idx_Name);
527+
if (joinAlgo.IsAtom("StreamLookupJoin")) {
528+
if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) {
529+
if (!EnsureTupleSize(joinAlgoOption, 2, ctx)) {
530+
return nullptr;
531+
}
532+
auto& value = *joinAlgoOption.Child(TCoNameValueTuple::idx_Value);
533+
if (!EnsureConvertibleTo<ui64>(value, name.Content(), ctx)) {
534+
return nullptr;
535+
}
536+
continue;
537+
}
538+
}
539+
ctx.AddError(TIssue(ctx.GetPosition(joinAlgoOption.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join option: " << name.Content()));
540+
return nullptr;
541+
}
542+
}
543+
544+
if (input->ChildrenSize() > TDqJoin::idx_Flags) {
545+
auto& flags = *input->Child(TDqJoin::idx_Flags);
546+
for (auto i = 0U; i < flags.ChildrenSize(); ++i) {
547+
if (const auto& flag = *flags.Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
508548
ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported DQ join option: " << flag.Content()));
509549
return nullptr;
510550
}
@@ -581,6 +621,10 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
581621
if (!leftInputType) {
582622
return TStatus::Error;
583623
}
624+
if (auto joinType = cnStreamLookup.JoinType(); joinType != TStringBuf("Left")) {
625+
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
626+
return TStatus::Error;
627+
}
584628
const auto leftRowType = GetSeqItemType(leftInputType);
585629
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
586630
const auto outputRowType = GetDqJoinResultType<true>(
@@ -593,7 +637,11 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
593637
cnStreamLookup.JoinKeys(),
594638
ctx
595639
);
596-
//TODO (YQ-2068) verify lookup parameters
640+
if (!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) ||
641+
!EnsureConvertibleTo<ui64>(cnStreamLookup.TTL().Ref(), "TTL", ctx) ||
642+
!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) {
643+
return TStatus::Error;
644+
}
597645
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
598646
return TStatus::Ok;
599647
}
@@ -1212,16 +1260,13 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
12121260
return false;
12131261
}
12141262

1215-
if (name == LogicalIdSettingName && !TryFromString<ui64>(value->Content())) {
1216-
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain ui64 value, but got: " << value->Content()));
1263+
if (name == LogicalIdSettingName && !EnsureConvertibleTo<ui64>(*value, name, ctx)) {
12171264
return false;
12181265
}
1219-
if (name == BlockStatusSettingName && !TryFromString<EBlockStatus>(value->Content())) {
1220-
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
1266+
if (name == BlockStatusSettingName && !EnsureConvertibleTo<EBlockStatus>(*value, name, ctx)) {
12211267
return false;
12221268
}
1223-
if (name == PartitionModeSettingName && !TryFromString<EPartitionMode>(value->Content())) {
1224-
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
1269+
if (name == PartitionModeSettingName && !EnsureConvertibleTo<EPartitionMode>(*value, name, ctx)) {
12251270
return false;
12261271
}
12271272
} else if (name == WideChannelsSettingName) {

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,36 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
241241
return DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal);
242242
}
243243

244+
bool ValidateStreamLookupJoinFlags(const TDqJoin& join, TExprContext& ctx) {
245+
bool leftAny = false;
246+
bool rightAny = false;
247+
if (const auto maybeFlags = join.Flags()) {
248+
for (auto&& flag: maybeFlags.Cast()) {
249+
auto&& name = flag.StringValue();
250+
if (name == "LeftAny"sv) {
251+
leftAny = true;
252+
continue;
253+
} else if (name == "RightAny"sv) {
254+
rightAny = true;
255+
continue;
256+
}
257+
}
258+
if (leftAny) {
259+
ctx.AddError(TIssue(ctx.GetPosition(maybeFlags.Cast().Pos()), "Streamlookup ANY LEFT join is not implemented"));
260+
return false;
261+
}
262+
}
263+
if (!rightAny) {
264+
if (false) { // Tempoarily change to waring to allow for smooth transition
265+
ctx.AddError(TIssue(ctx.GetPosition(join.Pos()), "Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
266+
return false;
267+
} else {
268+
ctx.AddWarning(TIssue(ctx.GetPosition(join.Pos()), "(Deprecation) Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
269+
}
270+
}
271+
return true;
272+
}
273+
244274
TMaybeNode<TExprBase> RewriteStreamLookupJoin(TExprBase node, TExprContext& ctx) {
245275
const auto join = node.Cast<TDqJoin>();
246276
if (join.JoinAlgo().StringValue() != "StreamLookupJoin") {
@@ -252,6 +282,36 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
252282
if (!left) {
253283
return node;
254284
}
285+
286+
if (!ValidateStreamLookupJoinFlags(join, ctx)) {
287+
return {};
288+
}
289+
290+
TExprNode::TPtr ttl;
291+
TExprNode::TPtr maxCachedRows;
292+
TExprNode::TPtr maxDelayedRows;
293+
if (const auto maybeOptions = join.JoinAlgoOptions()) {
294+
for (auto&& option: maybeOptions.Cast()) {
295+
auto&& name = option.Name().Value();
296+
if (name == "TTL"sv) {
297+
ttl = option.Value().Cast().Ptr();
298+
} else if (name == "MaxCachedRows"sv) {
299+
maxCachedRows = option.Value().Cast().Ptr();
300+
} else if (name == "MaxDelayedRows"sv) {
301+
maxDelayedRows = option.Value().Cast().Ptr();
302+
}
303+
}
304+
}
305+
306+
if (!ttl) {
307+
ttl = ctx.NewAtom(pos, 300);
308+
}
309+
if (!maxCachedRows) {
310+
maxCachedRows = ctx.NewAtom(pos, 1'000'000);
311+
}
312+
if (!maxDelayedRows) {
313+
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
314+
}
255315
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
256316
.Output(left.Output().Cast())
257317
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
@@ -261,9 +321,9 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
261321
.JoinType(join.JoinType())
262322
.LeftJoinKeyNames(join.LeftJoinKeyNames())
263323
.RightJoinKeyNames(join.RightJoinKeyNames())
264-
.TTL(ctx.NewAtom(pos, 300)) //TODO configure me
265-
.MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me
266-
.MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me
324+
.TTL(ttl)
325+
.MaxCachedRows(maxCachedRows)
326+
.MaxDelayedRows(maxDelayedRows)
267327
.Done();
268328

269329
auto lambda = Build<TCoLambda>(ctx, pos)

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,9 +619,9 @@ namespace NYql::NDqs {
619619
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
620620
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
621621
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
622-
settings.SetMaxDelayedRows(1'000'000); //TODO configure me
623-
settings.SetCacheLimit(1'000'000); //TODO configure me
624-
settings.SetCacheTtlSeconds(60); //TODO configure me
622+
settings.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows().StringValue()));
623+
settings.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL().StringValue()));
624+
settings.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows().StringValue()));
625625

626626
const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
627627
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());

ydb/tests/fq/generic/streaming/test_join.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def freeze(json):
5353
e.Data as data, u.id as lookup
5454
from
5555
$input as e
56-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
56+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
5757
on(e.Data = u.data)
5858
;
5959
@@ -83,7 +83,7 @@ def freeze(json):
8383
e.Data as data, CAST(e.Data AS Int32) as id, u.data as lookup
8484
from
8585
$input as e
86-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
86+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
8787
on(CAST(e.Data AS Int32) = u.id)
8888
;
8989
@@ -121,7 +121,7 @@ def freeze(json):
121121
u.data as lookup
122122
from
123123
$input as e
124-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
124+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
125125
on(e.user = u.id)
126126
;
127127
@@ -165,7 +165,7 @@ def freeze(json):
165165
u.data as lookup
166166
from
167167
$input as e
168-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
168+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
169169
on(e.user = u.id)
170170
;
171171
@@ -230,7 +230,7 @@ def freeze(json):
230230
u.age as age
231231
from
232232
$input as e
233-
left join {streamlookup} ydb_conn_{table_name}.`users` as u
233+
left join {streamlookup} any ydb_conn_{table_name}.`users` as u
234234
on(e.user = u.id)
235235
;
236236
@@ -270,6 +270,12 @@ def freeze(json):
270270
]
271271
* 1000
272272
),
273+
"TTL",
274+
"10",
275+
"MaxCachedRows",
276+
"5",
277+
"MaxDelayedRows",
278+
"100",
273279
),
274280
# 5
275281
(
@@ -290,7 +296,7 @@ def freeze(json):
290296
eu.id as uid
291297
from
292298
$input as e
293-
left join {streamlookup} ydb_conn_{table_name}.`users` as eu
299+
left join {streamlookup} any ydb_conn_{table_name}.`users` as eu
294300
on(e.user = eu.id)
295301
;
296302
@@ -333,7 +339,7 @@ def freeze(json):
333339
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
334340
from
335341
$input as e
336-
left join {streamlookup} ydb_conn_{table_name}.db as u
342+
left join {streamlookup} any ydb_conn_{table_name}.db as u
337343
on(e.yb = u.b AND e.za = u.a )
338344
;
339345
@@ -378,7 +384,7 @@ def freeze(json):
378384
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
379385
from
380386
$input as e
381-
left join {streamlookup} ydb_conn_{table_name}.db as u
387+
left join {streamlookup} any ydb_conn_{table_name}.db as u
382388
on(e.za = u.a AND e.yb = u.b)
383389
;
384390
@@ -501,12 +507,12 @@ def test_streamlookup(
501507
database_id='local',
502508
)
503509

504-
sql, messages = TESTCASES[testcase]
510+
sql, messages, *options = TESTCASES[testcase]
505511
sql = sql.format(
506512
input_topic=self.input_topic,
507513
output_topic=self.output_topic,
508514
table_name=table_name,
509-
streamlookup=R'/*+ streamlookup() */' if streamlookup else '',
515+
streamlookup=Rf'/*+ streamlookup({" ".join(options)}) */' if streamlookup else '',
510516
)
511517

512518
one_time_waiter.wait()

yql/essentials/core/type_ann/type_ann_join.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ namespace NTypeAnnImpl {
475475

476476
IGraphTransformer::TStatus MapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
477477
Y_UNUSED(output);
478-
478+
479479
if (!EnsureArgsCount(*input, 9, ctx.Expr)) {
480480
return IGraphTransformer::TStatus::Error;
481481
}

0 commit comments

Comments
 (0)