Skip to content

Parse and pass streamlookup parameters #10381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
041f4a3
change DqJoin Flags from TCoAtomList into TCoNameValueTupleList
yumkam Oct 18, 2024
ed8589d
pass streamlookup() join parameters
yumkam Oct 18, 2024
c124abd
Reject unsupported streamlookup variants
yumkam Oct 18, 2024
60f010a
fix rebase error
yumkam Oct 18, 2024
62f0faa
fix typo
yumkam Oct 18, 2024
165c892
fix ut
yumkam Oct 18, 2024
8415564
fix index out of range
yumkam Oct 18, 2024
67706a7
fix typo
yumkam Oct 19, 2024
5da0d3f
fix streamlookup sql to match new requirements
yumkam Oct 21, 2024
ed2b182
rephrase error message
yumkam Oct 21, 2024
79aa0af
tests/fq/generic/streaming: update join to ANY
yumkam Oct 21, 2024
ab29f16
code style fixes
yumkam Oct 22, 2024
b2a3037
move validation to correct place
yumkam Oct 22, 2024
0cb9a55
Add test with parameters
yumkam Oct 23, 2024
0650a1f
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Oct 23, 2024
2883320
fix typo in new test
yumkam Oct 24, 2024
2263e6b
recanonize tests
yumkam Oct 24, 2024
67fbbc0
Style fixes
yumkam Oct 24, 2024
ae2aa80
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Oct 30, 2024
6791f02
Fix merge conflict with #8169
yumkam Oct 30, 2024
f6a06b0
fixup! Fix merge conflict with #8169
yumkam Oct 30, 2024
94c967b
style fixes
yumkam Nov 2, 2024
7789fc8
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 2, 2024
38e6436
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 13, 2024
12acefa
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 13, 2024
7b5503b
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 15, 2024
423ff1c
review fixes
yumkam Nov 15, 2024
4f98abe
review fixes (IntFromString -> FromString)
yumkam Nov 15, 2024
86e39ef
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 28, 2024
8cbc948
streamlookup: temporary weaken ANY requirement
yumkam Nov 28, 2024
d8f2f45
add forgotten file
yumkam Nov 28, 2024
53bdfbe
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Nov 28, 2024
4a80dc3
(re)canonize
yumkam Dec 3, 2024
3e72fec
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Dec 3, 2024
4b91004
Merge remote-tracking branch 'origin/main' into pass-in-inputtransfor…
yumkam Dec 9, 2024
04fabd6
update canonization
yumkam Dec 9, 2024
44c04e2
fixup
yumkam Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ class TxPlanSerializer {
std::variant<ui32, TArgContext> Visit(const TCoFlatMapBase& flatMap, const TCoGraceJoinCore& join, TQueryPlanNode& planNode) {
auto joinAlgo = "(Grace)";
for (size_t i=0; i<join.Flags().Size(); i++) {
if (join.Flags().Item(i).StringValue() == "Broadcast") {
if (join.Flags().Item(i).Name().StringValue() == "Broadcast") {
joinAlgo = "(MapJoin)";
}
}
Expand All @@ -1533,7 +1533,7 @@ class TxPlanSerializer {
std::variant<ui32, TArgContext> Visit(const TCoGraceJoinCore& join, TQueryPlanNode& planNode) {
auto joinAlgo = "(Grace)";
for (size_t i=0; i<join.Flags().Size(); i++) {
if (join.Flags().Item(i).StringValue() == "Broadcast") {
if (join.Flags().Item(i).Name().StringValue() == "Broadcast") {
joinAlgo = "(MapJoin)";
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
"Match": {"Type": "Callable", "Name": "DqJoin"},
"Children": [
{"Index": 8, "Name": "JoinAlgo", "Type": "TCoAtom"},
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
{"Index": 9, "Name": "Flags", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
"Name": "TDqPhyGraceJoin",
"Base": "TDqJoinBase",
"Match": {"Type": "Callable", "Name": "DqPhyGraceJoin"},
"Children": [
{"Index": 8, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
{"Index": 8, "Name": "Flags", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
84 changes: 53 additions & 31 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
bool rightAny = linkSettings.RightHints.contains("any");

TStringBuf joinType = joinTuple.Type().Value();

TSet<std::pair<TStringBuf, TStringBuf>> resultKeys;
if (joinType != TStringBuf("RightOnly") && joinType != TStringBuf("RightSemi")) {
resultKeys.insert(left->Keys.begin(), left->Keys.end());
Expand Down Expand Up @@ -242,7 +243,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
rightJoinKeyNames.emplace_back(rightColumnName);
}

if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny)) {
if ((linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin && (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode)) || !(leftAny || rightAny || !linkSettings.Options.empty())) {
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, leftAny))
.LeftLabel(leftTableLabel)
Expand All @@ -260,11 +261,26 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
.Done();
return TJoinInputDesc(Nothing(), dqJoin, std::move(resultKeys));
} else {
TExprNode::TListType flags;
if (leftAny)
flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "LeftAny", TNodeFlags::Default));
if (rightAny)
flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "RightAny", TNodeFlags::Default));
TVector<TCoNameValueTuple> flags;

if (leftAny) {
flags.push_back(
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
.Name().Build("LeftAny"sv, TNodeFlags::Default)
.Done());
}
if (rightAny) {
flags.push_back(
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
.Name().Build("RightAny"sv, TNodeFlags::Default)
.Done());
}
for (ui32 i = 0; i + 1 < linkSettings.Options.size(); i += 2)
flags.push_back(
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
.Name().Build(linkSettings.Options[i])
.Value<TCoAtom>().Build(linkSettings.Options[i + 1])
.Done());

auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false))
Expand Down Expand Up @@ -376,8 +392,8 @@ TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, co
auto rightFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), rightInput, rightFilterKeys);

if (useGraceCore) {
auto flags = Build<TCoAtomList>(ctx, join.Pos())
.Add<TCoAtom>().Value("Broadcast").Build()
auto flags = Build<TCoNameValueTupleList>(ctx, join.Pos())
.Add().Name().Build("Broadcast").Build()
.Done();

return Build<TDqPhyGraceJoin>(ctx, join.Pos())
Expand Down Expand Up @@ -566,20 +582,26 @@ TExprBase DqRewriteRightJoinToLeft(const TExprBase node, TExprContext& ctx) {
return node;
}

TMaybeNode<TCoAtomList> newFlags;
if (TMaybeNode<TCoAtomList> flags = dqJoin.Flags()) {
auto flagsBuilder = Build<TCoAtomList>(ctx, flags.Cast().Pos());
TMaybeNode<TCoNameValueTupleList> newFlags;
if (TMaybeNode<TCoNameValueTupleList> flags = dqJoin.Flags()) {
TVector<TCoNameValueTuple> list;
for (auto flag: flags.Cast()) {
TStringBuf tail;
if( flag.Value().AfterPrefix("Left", tail)) {
flagsBuilder.Add().Value("Right" + TString(tail)).Build();
} else if ( flag.Value().AfterPrefix("Right", tail)) {
flagsBuilder.Add().Value("Left" + TString(tail)).Build();
if (flag.Name().Value().AfterPrefix("Left", tail)) {
list.push_back(Build<TCoNameValueTuple>(ctx, flag.Pos()).InitFrom(flag)
.Name().Build("Right" + TString(tail))
.Done());
} else if (flag.Name().Value().AfterPrefix("Right", tail)) {
list.push_back(Build<TCoNameValueTuple>(ctx, flag.Pos()).InitFrom(flag)
.Name().Build("Left" + TString(tail))
.Done());
} else {
flagsBuilder.Add(flag);
list.push_back(flag);
}
}
newFlags = flagsBuilder.Done();
newFlags = Build<TCoNameValueTupleList>(ctx, flags.Cast().Pos())
.Add(list)
.Done();
}

auto joinKeysBuilder = Build<TDqJoinKeyTupleList>(ctx, dqJoin.Pos());
Expand Down Expand Up @@ -684,15 +706,13 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
return join;
}

TExprNode::TListType flags;
if (const auto maybeFlags = join.Flags()) {
flags = maybeFlags.Cast().Ref().ChildrenList();
}

for (auto& flag : flags) {
if (flag->IsAtom("LeftAny") || flag->IsAtom("RightAny")) {
ctx.AddError(TIssue(ctx.GetPosition(join.Ptr()->Pos()), "ANY join kind is not currently supported"));
return join;
for (const auto& flag : maybeFlags.Cast()) {
auto name = flag.Name().Value();
if (name == "LeftAny"sv || name == "RightAny"sv) {
ctx.AddError(TIssue(ctx.GetPosition(join.Ptr()->Pos()), "ANY join kind is not currently supported"));
return join;
}
}
}

Expand Down Expand Up @@ -1558,13 +1578,15 @@ TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, TExprContext&
[[fallthrough]];
case EHashJoinMode::Dict: {
bool leftAny = false, rightAny = false;
for (auto& flag : flags) {
if (flag->IsAtom("LeftAny")) {
TExprNode::TListType dictFlags;
for (const auto& flag : flags) {
const auto name = flag->Head().Content();
if (name == "LeftAny"sv) {
leftAny = true;
flag = ctx.NewAtom(flag->Pos(), "LeftUnique", TNodeFlags::Default);
} else if (flag->IsAtom("RightAny")) {
dictFlags.push_back(ctx.NewAtom(flag->Pos(), "LeftUnique", TNodeFlags::Default));
} else if (name == "RightAny"sv) {
rightAny = true;
flag = ctx.NewAtom(flag->Pos(), "RightUnique", TNodeFlags::Default);
dictFlags.push_back(ctx.NewAtom(flag->Pos(), "RightUnique", TNodeFlags::Default));
}
}

Expand All @@ -1582,7 +1604,7 @@ TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, TExprContext&
.Arg(0, "left")
.Arg(1, "right")
.Add(2, join.JoinType().Ptr())
.List(3).Add(std::move(flags)).Seal()
.List(3).Add(std::move(dictFlags)).Seal()
.Seal()
.Seal()
.Seal()
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const TExprBase& node, TExprCont
auto rightInput = ExpandJoinInput(*itemTypeRight, ctx.NewCallable(graceJoin.RightInput().Pos(), "ToFlow", {graceJoin.RightInput().Ptr()}), ctx, rightConvertedItems, pos);

TExprNode::TListType flags;
if (auto maybeFlags = graceJoin.Flags().Maybe<TCoAtomList>()) {
if (auto maybeFlags = graceJoin.Flags().Maybe<TCoNameValueTupleList>()) {
flags = maybeFlags.Cast().Ref().ChildrenList();
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ void InferStatisticsForGraceJoin(const TExprNode::TPtr& input, TTypeAnnotationCo

auto joinAlgo = EJoinAlgoType::GraceJoin;
for (size_t i=0; i<join.Flags().Size(); i++) {
if (join.Flags().Item(i).StringValue() == "Broadcast") {
if (join.Flags().Item(i).Name().StringValue() == "Broadcast") {
joinAlgo = EJoinAlgoType::MapJoin;
break;
}
Expand Down
47 changes: 41 additions & 6 deletions ydb/library/yql/dq/type_ann/dq_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
}
}

if (!EnsureAtom(*input->Child(TDqJoin::idx_JoinType), ctx)) {
const auto& joinType = *input->Child(TDqJoin::idx_JoinType);
if (!EnsureAtom(joinType, ctx)) {
return nullptr;
}

Expand Down Expand Up @@ -502,10 +503,27 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
? join.RightLabel().Cast<TCoAtom>().Value()
: TStringBuf("");

if (input->ChildrenSize() > 9U) {
for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) {
if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported DQ join option: " << flag.Content()));
if (input->ChildrenSize() > TDqJoin::idx_JoinAlgo) {
const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo);
if (input->ChildrenSize() > TDqJoin::idx_Flags) {
auto&& flags = input->Tail();
for (ui32 i = 0; i < flags.ChildrenSize(); ++i) {
auto&& flag = *flags.Child(i);
if (!EnsureTupleOfAtoms(flag, ctx) || !EnsureTupleMinSize(flag, 1, ctx))
return nullptr;
auto&& name = *flag.Child(TCoNameValueTuple::idx_Name);
if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) {
if (joinAlgo.IsAtom("StreamLookupJoin")) {
if (!EnsureTupleSize(flag, 2, ctx))
return nullptr;
continue;
}
} else if (name.IsAtom({"LeftAny", "RightAny"})) {
if (!EnsureTupleSize(flag, 1, ctx))
return nullptr;
continue;
}
ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join option: " << name.Content()));
return nullptr;
}
}
Expand Down Expand Up @@ -581,6 +599,10 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
if (!leftInputType) {
return TStatus::Error;
}
if (auto joinType = cnStreamLookup.JoinType(); joinType != TStringBuf("Left")) {
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
return TStatus::Error;
}
const auto leftRowType = GetSeqItemType(leftInputType);
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
const auto outputRowType = GetDqJoinResultType<true>(
Expand All @@ -593,7 +615,20 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
cnStreamLookup.JoinKeys(),
ctx
);
//TODO (YQ-2068) verify lookup parameters
auto validateIntParam = [&ctx=ctx](auto&& value) {
// matches dq_tasks.proto
if (!TryFromString<ui64>(value.StringValue())) {
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << "Expected integer, but got: " << value.StringValue()));
return false;
}
return true;
};
if (!validateIntParam(cnStreamLookup.MaxCachedRows()))
return TStatus::Error;
if (!validateIntParam(cnStreamLookup.TTL()))
return TStatus::Error;
if (!validateIntParam(cnStreamLookup.MaxDelayedRows()))
return TStatus::Error;
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
return TStatus::Ok;
}
Expand Down
51 changes: 48 additions & 3 deletions ydb/library/yql/providers/dq/opt/physical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,51 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
if (!left) {
return node;
}
TExprNode::TPtr ttl = nullptr;
TExprNode::TPtr maxCachedRows = nullptr;
TExprNode::TPtr maxDelayedRows = nullptr;
bool leftAny = false;
bool rightAny = false;
if (const auto maybeFlags = join.Flags()) {
for (auto&& flag: maybeFlags.Cast()) {
auto&& name = flag.Name().Value();
if (name == "TTL"sv) {
ttl = flag.Ref().Child(1);
} else if (name == "MaxCachedRows"sv) {
maxCachedRows = flag.Ref().Child(1);
} else if (name == "MaxDelayedRows"sv) {
maxDelayedRows = flag.Ref().Child(1);
} else if (name == "LeftAny"sv) {
leftAny = true;
continue;
} else if (name == "RightAny"sv) {
rightAny = true;
continue;
}
}
if (leftAny) {
ctx.AddError(TIssue(ctx.GetPosition(maybeFlags.Cast().Pos()), "Streamlookup ANY LEFT join is not implemented"));
return {};
}
}
if (!rightAny) {
if (false) { // Tempoarily change to waring to allow for smooth transition
ctx.AddError(TIssue(ctx.GetPosition(join.Pos()), "Streamlookup: must be LEFT JOIN ANY"));
return {};
} else {
ctx.AddWarning(TIssue(ctx.GetPosition(join.Pos()), "(Deprecation) Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
}
}

if (!ttl) {
ttl = ctx.NewAtom(pos, 300);
}
if (!maxCachedRows) {
maxCachedRows = ctx.NewAtom(pos, 1'000'000);
}
if (!maxDelayedRows) {
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
}
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
.Output(left.Output().Cast())
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
Expand All @@ -261,9 +306,9 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
.JoinType(join.JoinType())
.LeftJoinKeyNames(join.LeftJoinKeyNames())
.RightJoinKeyNames(join.RightJoinKeyNames())
.TTL(ctx.NewAtom(pos, 300)) //TODO configure me
.MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me
.MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me
.TTL(ttl)
.MaxCachedRows(maxCachedRows)
.MaxDelayedRows(maxDelayedRows)
.Done();

auto lambda = Build<TCoLambda>(ctx, pos)
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ namespace NYql::NDqs {
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
settings.SetMaxDelayedRows(1'000'000); //TODO configure me
settings.SetCacheLimit(1'000'000); //TODO configure me
settings.SetCacheTtlSeconds(60); //TODO configure me
settings.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows().StringValue()));
settings.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL().StringValue()));
settings.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows().StringValue()));

const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase {
bool leftAny = false, rightAny = false;
if (const auto maybeJoin = join.Maybe<TDqJoin>()) {
if (const auto maybeFlags = maybeJoin.Cast().Flags()) {
maybeFlags.Cast().Ref().ForEachChild([&](const TExprNode& flag) {
if (flag.IsAtom("LeftAny"))
for (const auto& flag: maybeFlags.Cast()) {
const auto name = flag.Name().Value();
if (name == "LeftAny"sv)
leftAny = true;
else if (flag.IsAtom("RightAny"))
else if (name == "RightAny"sv)
rightAny = true;
});
}
}
}

Expand Down
Loading
Loading