Skip to content

Commit 771c004

Browse files
authored
Merge 81a3bd2 into d78c86c
2 parents d78c86c + 81a3bd2 commit 771c004

File tree

12 files changed

+158
-31
lines changed

12 files changed

+158
-31
lines changed

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
"Match": {"Type": "Callable", "Name": "DqJoin"},
4545
"Children": [
4646
{"Index": 8, "Name": "JoinAlgo", "Type": "TCoAtom"},
47-
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
47+
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true},
48+
{"Index": 10, "Name": "Options", "Type": "TCoNameValueTupleList", "Optional": true}
4849
]
4950
},
5051
{

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
242242
rightJoinKeyNames.emplace_back(rightColumnName);
243243
}
244244

245-
if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny)) {
245+
if ((linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin && (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode)) || !(leftAny || rightAny || !linkSettings.Options.empty())) {
246246
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
247247
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, leftAny))
248248
.LeftLabel(leftTableLabel)
@@ -266,6 +266,14 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
266266
if (rightAny)
267267
flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "RightAny", TNodeFlags::Default));
268268

269+
TVector<TCoNameValueTuple> options;
270+
for (ui32 i = 0; i + 1 < linkSettings.Options.size(); i += 2)
271+
options.push_back(
272+
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
273+
.Name().Build(linkSettings.Options[i])
274+
.Value<TCoAtom>().Build(linkSettings.Options[i + 1])
275+
.Done());
276+
269277
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
270278
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false))
271279
.LeftLabel(leftTableLabel)
@@ -280,9 +288,11 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
280288
.Add(rightJoinKeyNames)
281289
.Build()
282290
.JoinAlgo(joinAlgo)
283-
.Flags().Add(std::move(flags)).Build()
284-
.Done();
285-
return TJoinInputDesc(Nothing(), dqJoin, std::move(resultKeys));
291+
.Flags().Add(std::move(flags)).Build();
292+
if (!options.empty()) {
293+
dqJoin.Options().Add(std::move(options)).Build();
294+
}
295+
return TJoinInputDesc(Nothing(), dqJoin.Done(), std::move(resultKeys));
286296
}
287297
}
288298

ydb/library/yql/dq/type_ann/dq_type_ann.cpp

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp
428428

429429
template <bool IsMapJoin>
430430
const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool stream, TExprContext& ctx) {
431-
if (!EnsureMinMaxArgsCount(*input, 8, 10, ctx)) {
431+
if (!EnsureMinMaxArgsCount(*input, 8, 11, ctx)) {
432432
return nullptr;
433433
}
434434

@@ -444,7 +444,8 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
444444
}
445445
}
446446

447-
if (!EnsureAtom(*input->Child(TDqJoin::idx_JoinType), ctx)) {
447+
const auto& joinType = *input->Child(TDqJoin::idx_JoinType);
448+
if (!EnsureAtom(joinType, ctx)) {
448449
return nullptr;
449450
}
450451

@@ -502,9 +503,35 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
502503
? join.RightLabel().Cast<TCoAtom>().Value()
503504
: TStringBuf("");
504505

505-
if (input->ChildrenSize() > 9U) {
506-
for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) {
507-
if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
506+
if (input->ChildrenSize() > TDqJoin::idx_Options) {
507+
const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo);
508+
if (!EnsureAtom(joinAlgo, ctx)) {
509+
return nullptr;
510+
}
511+
if (input->ChildrenSize() > TDqJoin::idx_Options) {
512+
auto& options = *input->Child(TDqJoin::idx_Options);
513+
for (ui32 i = 0; i < options.ChildrenSize(); ++i) {
514+
auto& option = *options.Child(i);
515+
if (!EnsureTupleOfAtoms(option, ctx) || !EnsureTupleMinSize(option, 1, ctx))
516+
return nullptr;
517+
auto& name = *option.Child(TCoNameValueTuple::idx_Name);
518+
if (joinAlgo.IsAtom("StreamLookupJoin")) {
519+
if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) {
520+
if (!EnsureTupleSize(option, 2, ctx))
521+
return nullptr;
522+
continue;
523+
}
524+
}
525+
ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join option: " << name.Content()));
526+
return nullptr;
527+
}
528+
}
529+
}
530+
531+
if (input->ChildrenSize() > TDqJoin::idx_Flags) {
532+
auto& flags = *input->Child(TDqJoin::idx_Flags);
533+
for (auto i = 0U; i < flags.ChildrenSize(); ++i) {
534+
if (const auto& flag = *flags.Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
508535
ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported DQ join option: " << flag.Content()));
509536
return nullptr;
510537
}
@@ -581,6 +608,10 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
581608
if (!leftInputType) {
582609
return TStatus::Error;
583610
}
611+
if (auto joinType = cnStreamLookup.JoinType(); joinType != TStringBuf("Left")) {
612+
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
613+
return TStatus::Error;
614+
}
584615
const auto leftRowType = GetSeqItemType(leftInputType);
585616
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
586617
const auto outputRowType = GetDqJoinResultType<true>(
@@ -593,7 +624,20 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
593624
cnStreamLookup.JoinKeys(),
594625
ctx
595626
);
596-
//TODO (YQ-2068) verify lookup parameters
627+
auto validateIntParam = [&ctx=ctx](auto&& value) {
628+
// matches dq_tasks.proto
629+
if (!TryFromString<ui64>(value.StringValue())) {
630+
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << "Expected integer, but got: " << value.StringValue()));
631+
return false;
632+
}
633+
return true;
634+
};
635+
if (!validateIntParam(cnStreamLookup.MaxCachedRows()))
636+
return TStatus::Error;
637+
if (!validateIntParam(cnStreamLookup.TTL()))
638+
return TStatus::Error;
639+
if (!validateIntParam(cnStreamLookup.MaxDelayedRows()))
640+
return TStatus::Error;
597641
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
598642
return TStatus::Ok;
599643
}

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,59 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
252252
if (!left) {
253253
return node;
254254
}
255+
256+
bool leftAny = false;
257+
bool rightAny = false;
258+
if (const auto maybeFlags = join.Flags()) {
259+
for (auto&& flag: maybeFlags.Cast()) {
260+
auto&& name = flag.StringValue();
261+
if (name == "LeftAny"sv) {
262+
leftAny = true;
263+
continue;
264+
} else if (name == "RightAny"sv) {
265+
rightAny = true;
266+
continue;
267+
}
268+
}
269+
if (leftAny) {
270+
ctx.AddError(TIssue(ctx.GetPosition(maybeFlags.Cast().Pos()), "Streamlookup ANY LEFT join is not implemented"));
271+
return {};
272+
}
273+
}
274+
if (!rightAny) {
275+
if (false) { // Tempoarily change to waring to allow for smooth transition
276+
ctx.AddError(TIssue(ctx.GetPosition(join.Pos()), "Streamlookup: must be LEFT JOIN ANY"));
277+
return {};
278+
} else {
279+
ctx.AddWarning(TIssue(ctx.GetPosition(join.Pos()), "(Deprecation) Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
280+
}
281+
}
282+
283+
TExprNode::TPtr ttl = nullptr;
284+
TExprNode::TPtr maxCachedRows = nullptr;
285+
TExprNode::TPtr maxDelayedRows = nullptr;
286+
if (const auto maybeOptions = join.Options()) {
287+
for (auto&& option: maybeOptions.Cast()) {
288+
auto&& name = option.Name().Value();
289+
if (name == "TTL"sv) {
290+
ttl = option.Ref().Child(1);
291+
} else if (name == "MaxCachedRows"sv) {
292+
maxCachedRows = option.Ref().Child(1);
293+
} else if (name == "MaxDelayedRows"sv) {
294+
maxDelayedRows = option.Ref().Child(1);
295+
}
296+
}
297+
}
298+
299+
if (!ttl) {
300+
ttl = ctx.NewAtom(pos, 300);
301+
}
302+
if (!maxCachedRows) {
303+
maxCachedRows = ctx.NewAtom(pos, 1'000'000);
304+
}
305+
if (!maxDelayedRows) {
306+
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
307+
}
255308
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
256309
.Output(left.Output().Cast())
257310
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
@@ -261,9 +314,9 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
261314
.JoinType(join.JoinType())
262315
.LeftJoinKeyNames(join.LeftJoinKeyNames())
263316
.RightJoinKeyNames(join.RightJoinKeyNames())
264-
.TTL(ctx.NewAtom(pos, 300)) //TODO configure me
265-
.MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me
266-
.MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me
317+
.TTL(ttl)
318+
.MaxCachedRows(maxCachedRows)
319+
.MaxDelayedRows(maxDelayedRows)
267320
.Done();
268321

269322
auto lambda = Build<TCoLambda>(ctx, pos)

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,9 +619,9 @@ namespace NYql::NDqs {
619619
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
620620
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
621621
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
622-
settings.SetMaxDelayedRows(1'000'000); //TODO configure me
623-
settings.SetCacheLimit(1'000'000); //TODO configure me
624-
settings.SetCacheTtlSeconds(60); //TODO configure me
622+
settings.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows().StringValue()));
623+
settings.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL().StringValue()));
624+
settings.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows().StringValue()));
625625

626626
const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
627627
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());

ydb/tests/fq/generic/streaming/test_join.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def freeze(json):
5353
e.Data as data, u.id as lookup
5454
from
5555
$input as e
56-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
56+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
5757
on(e.Data = u.data)
5858
;
5959
@@ -83,7 +83,7 @@ def freeze(json):
8383
e.Data as data, CAST(e.Data AS Int32) as id, u.data as lookup
8484
from
8585
$input as e
86-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
86+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
8787
on(CAST(e.Data AS Int32) = u.id)
8888
;
8989
@@ -121,7 +121,7 @@ def freeze(json):
121121
u.data as lookup
122122
from
123123
$input as e
124-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
124+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
125125
on(e.user = u.id)
126126
;
127127
@@ -165,7 +165,7 @@ def freeze(json):
165165
u.data as lookup
166166
from
167167
$input as e
168-
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
168+
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
169169
on(e.user = u.id)
170170
;
171171
@@ -230,7 +230,7 @@ def freeze(json):
230230
u.age as age
231231
from
232232
$input as e
233-
left join {streamlookup} ydb_conn_{table_name}.`users` as u
233+
left join {streamlookup} any ydb_conn_{table_name}.`users` as u
234234
on(e.user = u.id)
235235
;
236236
@@ -290,7 +290,7 @@ def freeze(json):
290290
eu.id as uid
291291
from
292292
$input as e
293-
left join {streamlookup} ydb_conn_{table_name}.`users` as eu
293+
left join {streamlookup} any ydb_conn_{table_name}.`users` as eu
294294
on(e.user = eu.id)
295295
;
296296
@@ -333,7 +333,7 @@ def freeze(json):
333333
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
334334
from
335335
$input as e
336-
left join {streamlookup} ydb_conn_{table_name}.db as u
336+
left join {streamlookup} any ydb_conn_{table_name}.db as u
337337
on(e.yb = u.b AND e.za = u.a )
338338
;
339339
@@ -378,7 +378,7 @@ def freeze(json):
378378
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
379379
from
380380
$input as e
381-
left join {streamlookup} ydb_conn_{table_name}.db as u
381+
left join {streamlookup} any ydb_conn_{table_name}.db as u
382382
on(e.za = u.a AND e.yb = u.b)
383383
;
384384

yql/essentials/core/type_ann/type_ann_join.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ namespace NTypeAnnImpl {
475475

476476
IGraphTransformer::TStatus MapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
477477
Y_UNUSED(output);
478-
478+
479479
if (!EnsureArgsCount(*input, 9, ctx.Expr)) {
480480
return IGraphTransformer::TStatus::Error;
481481
}

yql/essentials/core/yql_join.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,16 @@ namespace {
319319
}
320320
}
321321
else if (option.IsAtom("forceSortedMerge") || option.IsAtom("forceStreamLookup")) {
322-
if (!EnsureTupleSize(*child, 1, ctx)) {
323-
return IGraphTransformer::TStatus::Error;
322+
if (option.IsAtom("forceStreamLookup")) {
323+
if (child->ChildrenSize() % 2 == 0) {
324+
ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() <<
325+
"streamlookup() expects KEY VALUE... pairs"));
326+
return IGraphTransformer::TStatus::Error;
327+
}
328+
} else {
329+
if (!EnsureTupleSize(*child, 1, ctx)) {
330+
return IGraphTransformer::TStatus::Error;
331+
}
324332
}
325333
if (hasJoinStrategyHint) {
326334
ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() <<
@@ -1351,9 +1359,13 @@ TEquiJoinLinkSettings GetEquiJoinLinkSettings(const TExprNode& linkSettings) {
13511359
}
13521360

13531361
result.ForceSortedMerge = HasSetting(linkSettings, "forceSortedMerge");
1354-
1355-
if (HasSetting(linkSettings, "forceStreamLookup")) {
1362+
1363+
if (auto streamlookup = GetSetting(linkSettings, "forceStreamLookup")) {
13561364
result.JoinAlgo = EJoinAlgoType::StreamLookupJoin;
1365+
auto size = streamlookup->ChildrenSize();
1366+
for (decltype(size) i = 1; i < size; ++i) {
1367+
result.Options.push_back(TString(streamlookup->Child(i)->Content()));
1368+
}
13571369
}
13581370

13591371
if (HasSetting(linkSettings, "compact")) {

yql/essentials/core/yql_join.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ struct TEquiJoinLinkSettings {
148148
// JOIN implementation may ignore this flags if SortedMerge strategy is not supported
149149
bool ForceSortedMerge = false;
150150
bool Compact = false;
151+
TVector<TString> Options;
151152
};
152153

153154
TEquiJoinLinkSettings GetEquiJoinLinkSettings(const TExprNode& linkSettings);

yql/essentials/sql/v1/join.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,11 @@ class TEquiJoin: public TJoinBase {
502502
if (TJoinLinkSettings::EStrategy::SortedMerge == descr.LinkSettings.Strategy) {
503503
linkOptions = L(linkOptions, Q(Y(Q("forceSortedMerge"))));
504504
} else if (TJoinLinkSettings::EStrategy::StreamLookup == descr.LinkSettings.Strategy) {
505-
linkOptions = L(linkOptions, Q(Y(Q("forceStreamLookup"))));
505+
auto streamlookup = Y(Q("forceStreamLookup"));
506+
for (auto&& option: descr.LinkSettings.Values) {
507+
streamlookup = L(streamlookup, Q(option));
508+
}
509+
linkOptions = L(linkOptions, Q(streamlookup));
506510
} else if (TJoinLinkSettings::EStrategy::ForceMap == descr.LinkSettings.Strategy) {
507511
linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("MapJoin"))));
508512
} else if (TJoinLinkSettings::EStrategy::ForceGrace == descr.LinkSettings.Strategy) {

yql/essentials/sql/v1/source.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ namespace NSQLTranslationV1 {
172172
ForceGrace
173173
};
174174
EStrategy Strategy = EStrategy::Default;
175+
TVector<TString> Values;
175176
bool Compact = false;
176177
};
177178

yql/essentials/sql/v1/sql_select.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ bool CollectJoinLinkSettings(TPosition pos, TJoinLinkSettings& linkSettings, TCo
4343

4444
if (TJoinLinkSettings::EStrategy::Default == linkSettings.Strategy) {
4545
linkSettings.Strategy = newStrategy;
46+
linkSettings.Values = hint.Values;
4647
} else if (newStrategy == linkSettings.Strategy) {
4748
ctx.Error() << "Duplicate join strategy hint";
4849
return false;

0 commit comments

Comments
 (0)