Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@
{
"Name": "TKqlStreamLookupIndex",
"Base": "TKqlLookupIndexBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
"Children": [
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
]
},
{
"Name": "TKqlEffectBase",
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,12 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) || TKqlStreamLookupTable::Match(node.Get()) ? 4 : 3, ctx)) {
const bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get()) || TKqlStreamLookupIndex::Match(node.Get());
if (isStreamLookup && !EnsureArgsCount(*node, TKqlStreamLookupIndex::Match(node.Get()) ? 5 : 4, ctx)) {
return TStatus::Error;
}

if (!isStreamLookup && !EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -495,9 +500,9 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
YQL_ENSURE(lookupType);

const TStructExprType* structType = nullptr;
bool isStreamLookup = TKqlStreamLookupTable::Match(node.Get());
if (isStreamLookup) {
auto lookupStrategy = node->Child(TKqlStreamLookupTable::idx_LookupStrategy);
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
if (!EnsureAtom(*lookupStrategy, ctx)) {
return TStatus::Error;
}
Expand Down
75 changes: 49 additions & 26 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,44 +412,67 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
}

TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!kqpCtx.IsScanQuery()) {
if (!node.Maybe<TKqlStreamLookupIndex>()) {
return node;
}

if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) {
auto streamLookupIndex = maybeStreamLookupIndex.Cast();
auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();

const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());

const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
if (!needDataRead) {
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);

TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
if (!needDataRead) {
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();
}

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);

TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();

TMaybeNode<TExprBase> lookupKeys;
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
lookupKeys = Build<TCoMap>(ctx, node.Pos())
.Input(lookupIndexTable)
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("1").Build()
.Build()
.Add<TCoNth>()
.Tuple("tuple")
.Index().Value("0").Build()
.Build()
.Build()
.Build()
.Done();
} else {
lookupKeys = lookupIndexTable;
}

return node;
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(streamLookupIndex.Table())
.LookupKeys(lookupKeys.Cast())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Done();
}

/// Can push flat map node to read from table using only columns available in table description
Expand Down
42 changes: 27 additions & 15 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
.Columns(columns)
.Index()
.Build(indexName)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}

Expand Down Expand Up @@ -336,6 +337,7 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
const TDqJoin& join,
TExprBase leftInput,
const TString& indexName,
const TPrefixLookup& rightLookup,
const TKqpMatchReadResult& rightReadMatch,
TExprContext& ctx)
Expand Down Expand Up @@ -399,19 +401,30 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
? TKqpStreamLookupSemiJoinStrategyName
: TKqpStreamLookupJoinStrategyName;

TExprBase lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.LookupStrategy().Build(strategy)
.Done();
TMaybeNode<TExprBase> lookupJoin;
if (indexName) {
lookupJoin = Build<TKqlStreamLookupIndex>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.Index().Build(indexName)
.LookupStrategy().Build(strategy)
.Done();
} else {
lookupJoin = Build<TKqlStreamLookupTable>(ctx, join.Pos())
.Table(rightLookup.MainTable)
.LookupKeys(leftInput)
.Columns(lookupColumns.Cast())
.LookupStrategy().Build(strategy)
.Done();
}

// Stream lookup join output: stream<tuple<left_row_struct, optional<right_row_struct>>>
// so we should apply filters to second element of tuple for each row

if (extraRightFilter.IsValid()) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -433,7 +446,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.ExtractMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -455,7 +468,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.FilterNullMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -477,7 +490,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.SkipNullMembers) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -499,7 +512,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(

if (rightReadMatch.FlatMap) {
lookupJoin = Build<TCoMap>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.Lambda()
.Args({"tuple"})
.Body<TExprList>()
Expand All @@ -520,7 +533,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
}

return Build<TKqlIndexLookupJoin>(ctx, join.Pos())
.Input(lookupJoin)
.Input(lookupJoin.Cast())
.LeftLabel().Build(leftLabel)
.RightLabel().Build(rightLabel)
.JoinType(join.JoinType())
Expand Down Expand Up @@ -597,8 +610,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}

const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
&& !indexName;
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin;

auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
.Name("leftRowArg")
Expand Down Expand Up @@ -830,7 +842,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Build()
.Done();

return BuildKqpStreamIndexLookupJoin(join, leftInput, *prefixLookup, *rightReadMatch, ctx);
return BuildKqpStreamIndexLookupJoin(join, leftInput, indexName, *prefixLookup, *rightReadMatch, ctx);
}

auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
.LookupKeys(keys)
.Index(indexName.Cast())
.LookupKeys(keys)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
}
} else {
Expand Down
Loading