Skip to content

parse and pass streamlookup parameters, YDB part #12548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
"Match": {"Type": "Callable", "Name": "DqJoin"},
"Children": [
{"Index": 8, "Name": "JoinAlgo", "Type": "TCoAtom"},
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true},
{"Index": 10, "Name": "JoinAlgoOptions", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
19 changes: 15 additions & 4 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
rightJoinKeyNames.emplace_back(rightColumnName);
}

if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny)) {
if ((linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin && (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode)) || !(leftAny || rightAny || !linkSettings.JoinAlgoOptions.empty())) {
auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, leftAny))
.LeftLabel(leftTableLabel)
Expand All @@ -266,6 +266,15 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
if (rightAny)
flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "RightAny", TNodeFlags::Default));

TVector<TCoNameValueTuple> joinAlgoOptions;
for (ui32 i = 0; i + 1 < linkSettings.JoinAlgoOptions.size(); i += 2) {
joinAlgoOptions.push_back(
Build<TCoNameValueTuple>(ctx, joinTuple.Pos())
.Name().Build(linkSettings.JoinAlgoOptions[i])
.Value<TCoAtom>().Build(linkSettings.JoinAlgoOptions[i + 1])
.Done());
}

auto dqJoin = Build<TDqJoin>(ctx, joinTuple.Pos())
.LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false))
.LeftLabel(leftTableLabel)
Expand All @@ -280,9 +289,11 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
.Add(rightJoinKeyNames)
.Build()
.JoinAlgo(joinAlgo)
.Flags().Add(std::move(flags)).Build()
.Done();
return TJoinInputDesc(Nothing(), dqJoin, std::move(resultKeys));
.Flags().Add(std::move(flags)).Build();
if (!joinAlgoOptions.empty()) {
dqJoin.JoinAlgoOptions().Add(std::move(joinAlgoOptions)).Build();
}
return TJoinInputDesc(Nothing(), dqJoin.Done(), std::move(resultKeys));
}
}

Expand Down
71 changes: 58 additions & 13 deletions ydb/library/yql/dq/type_ann/dq_type_ann.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "dq_type_ann.h"

#include <yql/essentials/core/yql_expr_type_annotation.h>
#include <yql/essentials/core/yql_join.h>
#include <yql/essentials/core/yql_opt_utils.h>
Expand Down Expand Up @@ -85,6 +84,16 @@ const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStruc
return result;
}

template <typename TType>
bool EnsureConvertibleTo(const TExprNode& value, const TStringBuf name, TExprContext& ctx) {
auto&& stringValue = value.Content();
if (!TryFromString<TType>(stringValue)) {
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << "Unsupported " << name << " value: " << stringValue));
return false;
}
return true;
}

template <typename TStage>
TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*stage, 3, 4, ctx)) {
Expand Down Expand Up @@ -428,7 +437,7 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp

template <bool IsMapJoin>
const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool stream, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 8, 10, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 8, 11, ctx)) {
return nullptr;
}

Expand All @@ -444,7 +453,8 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
}
}

if (!EnsureAtom(*input->Child(TDqJoin::idx_JoinType), ctx)) {
const auto& joinType = *input->Child(TDqJoin::idx_JoinType);
if (!EnsureAtom(joinType, ctx)) {
return nullptr;
}

Expand Down Expand Up @@ -502,9 +512,39 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st
? join.RightLabel().Cast<TCoAtom>().Value()
: TStringBuf("");

if (input->ChildrenSize() > 9U) {
for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) {
if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
if (input->ChildrenSize() > TDqJoin::idx_JoinAlgoOptions) {
const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo);
if (!EnsureAtom(joinAlgo, ctx)) {
return nullptr;
}
auto& joinAlgoOptions = *input->Child(TDqJoin::idx_JoinAlgoOptions);
for (ui32 i = 0; i < joinAlgoOptions.ChildrenSize(); ++i) {
auto& joinAlgoOption = *joinAlgoOptions.Child(i);
if (!EnsureTupleOfAtoms(joinAlgoOption, ctx) || !EnsureTupleMinSize(joinAlgoOption, 1, ctx)) {
return nullptr;
}
auto& name = *joinAlgoOption.Child(TCoNameValueTuple::idx_Name);
if (joinAlgo.IsAtom("StreamLookupJoin")) {
if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) {
if (!EnsureTupleSize(joinAlgoOption, 2, ctx)) {
return nullptr;
}
auto& value = *joinAlgoOption.Child(TCoNameValueTuple::idx_Value);
if (!EnsureConvertibleTo<ui64>(value, name.Content(), ctx)) {
return nullptr;
}
continue;
}
}
ctx.AddError(TIssue(ctx.GetPosition(joinAlgoOption.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join option: " << name.Content()));
return nullptr;
}
}

if (input->ChildrenSize() > TDqJoin::idx_Flags) {
auto& flags = *input->Child(TDqJoin::idx_Flags);
for (auto i = 0U; i < flags.ChildrenSize(); ++i) {
if (const auto& flag = *flags.Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) {
ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported DQ join option: " << flag.Content()));
return nullptr;
}
Expand Down Expand Up @@ -581,6 +621,10 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
if (!leftInputType) {
return TStatus::Error;
}
if (auto joinType = cnStreamLookup.JoinType(); joinType != TStringBuf("Left")) {
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
return TStatus::Error;
}
const auto leftRowType = GetSeqItemType(leftInputType);
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
const auto outputRowType = GetDqJoinResultType<true>(
Expand All @@ -593,7 +637,11 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
cnStreamLookup.JoinKeys(),
ctx
);
//TODO (YQ-2068) verify lookup parameters
if (!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) ||
!EnsureConvertibleTo<ui64>(cnStreamLookup.TTL().Ref(), "TTL", ctx) ||
!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) {
return TStatus::Error;
}
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
return TStatus::Ok;
}
Expand Down Expand Up @@ -1212,16 +1260,13 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
return false;
}

if (name == LogicalIdSettingName && !TryFromString<ui64>(value->Content())) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain ui64 value, but got: " << value->Content()));
if (name == LogicalIdSettingName && !EnsureConvertibleTo<ui64>(*value, name, ctx)) {
return false;
}
if (name == BlockStatusSettingName && !TryFromString<EBlockStatus>(value->Content())) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
if (name == BlockStatusSettingName && !EnsureConvertibleTo<EBlockStatus>(*value, name, ctx)) {
return false;
}
if (name == PartitionModeSettingName && !TryFromString<EPartitionMode>(value->Content())) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
if (name == PartitionModeSettingName && !EnsureConvertibleTo<EPartitionMode>(*value, name, ctx)) {
return false;
}
} else if (name == WideChannelsSettingName) {
Expand Down
66 changes: 63 additions & 3 deletions ydb/library/yql/providers/dq/opt/physical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,36 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
return DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal);
}

bool ValidateStreamLookupJoinFlags(const TDqJoin& join, TExprContext& ctx) {
bool leftAny = false;
bool rightAny = false;
if (const auto maybeFlags = join.Flags()) {
for (auto&& flag: maybeFlags.Cast()) {
auto&& name = flag.StringValue();
if (name == "LeftAny"sv) {
leftAny = true;
continue;
} else if (name == "RightAny"sv) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else избыточен

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, но, кмк, в случае исключающих альтернатив, использовать всегда if () {} else if () {} лучше для консистентности. Меняем/переносим continue, и, внезапно, надо менять и следующую строчку (или, хуже, забываем поменять).

rightAny = true;
continue;
}
}
if (leftAny) {
ctx.AddError(TIssue(ctx.GetPosition(maybeFlags.Cast().Pos()), "Streamlookup ANY LEFT join is not implemented"));
return false;
}
}
if (!rightAny) {
if (false) { // Tempoarily change to waring to allow for smooth transition
ctx.AddError(TIssue(ctx.GetPosition(join.Pos()), "Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
return false;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else избыточен

Copy link
Collaborator Author

@yumkam yumkam Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Аналогично. И тут похожий код в обеих ветках, одинаковый отступ упрощает сравнение.
if () { /* коротко */ return; } else { /* длинно, иначе и вложенно */ } -- да, прямой смысл избавиться от else и уменьшить отступ. В данном случае, кмк, нет. (Тем более, что в будущем это if просто уйдёт)

ctx.AddWarning(TIssue(ctx.GetPosition(join.Pos()), "(Deprecation) Streamlookup: must be LEFT JOIN /*+streamlookup(...)*/ ANY"));
}
}
return true;
}

TMaybeNode<TExprBase> RewriteStreamLookupJoin(TExprBase node, TExprContext& ctx) {
const auto join = node.Cast<TDqJoin>();
if (join.JoinAlgo().StringValue() != "StreamLookupJoin") {
Expand All @@ -252,6 +282,36 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
if (!left) {
return node;
}

if (!ValidateStreamLookupJoinFlags(join, ctx)) {
return {};
}

TExprNode::TPtr ttl;
TExprNode::TPtr maxCachedRows;
TExprNode::TPtr maxDelayedRows;
if (const auto maybeOptions = join.JoinAlgoOptions()) {
for (auto&& option: maybeOptions.Cast()) {
auto&& name = option.Name().Value();
if (name == "TTL"sv) {
ttl = option.Value().Cast().Ptr();
} else if (name == "MaxCachedRows"sv) {
maxCachedRows = option.Value().Cast().Ptr();
} else if (name == "MaxDelayedRows"sv) {
maxDelayedRows = option.Value().Cast().Ptr();
}
}
}

if (!ttl) {
ttl = ctx.NewAtom(pos, 300);
}
if (!maxCachedRows) {
maxCachedRows = ctx.NewAtom(pos, 1'000'000);
}
if (!maxDelayedRows) {
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
}
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
.Output(left.Output().Cast())
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
Expand All @@ -261,9 +321,9 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
.JoinType(join.JoinType())
.LeftJoinKeyNames(join.LeftJoinKeyNames())
.RightJoinKeyNames(join.RightJoinKeyNames())
.TTL(ctx.NewAtom(pos, 300)) //TODO configure me
.MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me
.MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me
.TTL(ttl)
.MaxCachedRows(maxCachedRows)
.MaxDelayedRows(maxDelayedRows)
.Done();

auto lambda = Build<TCoLambda>(ctx, pos)
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ namespace NYql::NDqs {
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
settings.SetMaxDelayedRows(1'000'000); //TODO configure me
settings.SetCacheLimit(1'000'000); //TODO configure me
settings.SetCacheTtlSeconds(60); //TODO configure me
settings.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows().StringValue()));
settings.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL().StringValue()));
settings.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows().StringValue()));

const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());
Expand Down
26 changes: 16 additions & 10 deletions ydb/tests/fq/generic/streaming/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def freeze(json):
e.Data as data, u.id as lookup
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
on(e.Data = u.data)
;

Expand Down Expand Up @@ -83,7 +83,7 @@ def freeze(json):
e.Data as data, CAST(e.Data AS Int32) as id, u.data as lookup
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
on(CAST(e.Data AS Int32) = u.id)
;

Expand Down Expand Up @@ -121,7 +121,7 @@ def freeze(json):
u.data as lookup
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
on(e.user = u.id)
;

Expand Down Expand Up @@ -165,7 +165,7 @@ def freeze(json):
u.data as lookup
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.{table_name} as u
left join {streamlookup} any ydb_conn_{table_name}.{table_name} as u
on(e.user = u.id)
;

Expand Down Expand Up @@ -230,7 +230,7 @@ def freeze(json):
u.age as age
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.`users` as u
left join {streamlookup} any ydb_conn_{table_name}.`users` as u
on(e.user = u.id)
;

Expand Down Expand Up @@ -270,6 +270,12 @@ def freeze(json):
]
* 1000
),
"TTL",
"10",
"MaxCachedRows",
"5",
"MaxDelayedRows",
"100",
),
# 5
(
Expand All @@ -290,7 +296,7 @@ def freeze(json):
eu.id as uid
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.`users` as eu
left join {streamlookup} any ydb_conn_{table_name}.`users` as eu
on(e.user = eu.id)
;

Expand Down Expand Up @@ -333,7 +339,7 @@ def freeze(json):
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
left join {streamlookup} any ydb_conn_{table_name}.db as u
on(e.yb = u.b AND e.za = u.a )
;

Expand Down Expand Up @@ -378,7 +384,7 @@ def freeze(json):
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
left join {streamlookup} any ydb_conn_{table_name}.db as u
on(e.za = u.a AND e.yb = u.b)
;

Expand Down Expand Up @@ -501,12 +507,12 @@ def test_streamlookup(
database_id='local',
)

sql, messages = TESTCASES[testcase]
sql, messages, *options = TESTCASES[testcase]
sql = sql.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
table_name=table_name,
streamlookup=R'/*+ streamlookup() */' if streamlookup else '',
streamlookup=Rf'/*+ streamlookup({" ".join(options)}) */' if streamlookup else '',
)

one_time_waiter.wait()
Expand Down
Loading