Skip to content

Commit a079326

Browse files
yumkam7yumkam
authored andcommitted
Parse and pass streamlookup parameters, YQL part
commit_hash:1ad24832c82e465562c48bea1189a0187f691cd4 (cherry picked from commit 338a2a1)
1 parent 4292ec7 commit a079326

File tree

6 files changed

+26
-6
lines changed

6 files changed

+26
-6
lines changed

yql/essentials/core/type_ann/type_ann_join.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ namespace NTypeAnnImpl {
475475

476476
IGraphTransformer::TStatus MapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
477477
Y_UNUSED(output);
478-
478+
479479
if (!EnsureArgsCount(*input, 9, ctx.Expr)) {
480480
return IGraphTransformer::TStatus::Error;
481481
}

yql/essentials/core/yql_join.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,16 @@ namespace {
319319
}
320320
}
321321
else if (option.IsAtom("forceSortedMerge") || option.IsAtom("forceStreamLookup")) {
322-
if (!EnsureTupleSize(*child, 1, ctx)) {
323-
return IGraphTransformer::TStatus::Error;
322+
if (option.IsAtom("forceStreamLookup")) {
323+
if (child->ChildrenSize() % 2 == 0) {
324+
ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() <<
325+
"streamlookup() expects KEY VALUE... pairs"));
326+
return IGraphTransformer::TStatus::Error;
327+
}
328+
} else {
329+
if (!EnsureTupleSize(*child, 1, ctx)) {
330+
return IGraphTransformer::TStatus::Error;
331+
}
324332
}
325333
if (hasJoinStrategyHint) {
326334
ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() <<
@@ -1351,9 +1359,14 @@ TEquiJoinLinkSettings GetEquiJoinLinkSettings(const TExprNode& linkSettings) {
13511359
}
13521360

13531361
result.ForceSortedMerge = HasSetting(linkSettings, "forceSortedMerge");
1354-
1355-
if (HasSetting(linkSettings, "forceStreamLookup")) {
1362+
1363+
if (auto streamlookup = GetSetting(linkSettings, "forceStreamLookup")) {
1364+
YQL_ENSURE(result.JoinAlgoOptions.empty());
13561365
result.JoinAlgo = EJoinAlgoType::StreamLookupJoin;
1366+
auto size = streamlookup->ChildrenSize();
1367+
for (decltype(size) i = 1; i < size; ++i) {
1368+
result.JoinAlgoOptions.push_back(TString(streamlookup->Child(i)->Content()));
1369+
}
13571370
}
13581371

13591372
if (HasSetting(linkSettings, "compact")) {

yql/essentials/core/yql_join.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ struct TEquiJoinLinkSettings {
148148
// JOIN implementation may ignore this flags if SortedMerge strategy is not supported
149149
bool ForceSortedMerge = false;
150150
bool Compact = false;
151+
TVector<TString> JoinAlgoOptions;
151152
};
152153

153154
TEquiJoinLinkSettings GetEquiJoinLinkSettings(const TExprNode& linkSettings);

yql/essentials/sql/v1/join.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,11 @@ class TEquiJoin: public TJoinBase {
502502
if (TJoinLinkSettings::EStrategy::SortedMerge == descr.LinkSettings.Strategy) {
503503
linkOptions = L(linkOptions, Q(Y(Q("forceSortedMerge"))));
504504
} else if (TJoinLinkSettings::EStrategy::StreamLookup == descr.LinkSettings.Strategy) {
505-
linkOptions = L(linkOptions, Q(Y(Q("forceStreamLookup"))));
505+
auto streamlookup = Y(Q("forceStreamLookup"));
506+
for (auto&& option: descr.LinkSettings.Values) {
507+
streamlookup = L(streamlookup, Q(option));
508+
}
509+
linkOptions = L(linkOptions, Q(streamlookup));
506510
} else if (TJoinLinkSettings::EStrategy::ForceMap == descr.LinkSettings.Strategy) {
507511
linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("MapJoin"))));
508512
} else if (TJoinLinkSettings::EStrategy::ForceGrace == descr.LinkSettings.Strategy) {

yql/essentials/sql/v1/source.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ namespace NSQLTranslationV1 {
172172
ForceGrace
173173
};
174174
EStrategy Strategy = EStrategy::Default;
175+
TVector<TString> Values;
175176
bool Compact = false;
176177
};
177178

yql/essentials/sql/v1/sql_select.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ bool CollectJoinLinkSettings(TPosition pos, TJoinLinkSettings& linkSettings, TCo
4343

4444
if (TJoinLinkSettings::EStrategy::Default == linkSettings.Strategy) {
4545
linkSettings.Strategy = newStrategy;
46+
linkSettings.Values = hint.Values;
4647
} else if (newStrategy == linkSettings.Strategy) {
4748
ctx.Error() << "Duplicate join strategy hint";
4849
return false;

0 commit comments

Comments
 (0)