Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx);
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx, *getParents());
DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down
162 changes: 140 additions & 22 deletions ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,175 @@

#include <ydb/core/kqp/opt/kqp_opt_impl.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/library/yql/core/yql_opt_utils.h>

namespace {

bool CanPushFlatMap(const NYql::NNodes::TCoFlatMapBase& flatMap, const NYql::TKikimrTableDescription& tableDesc, const NYql::TParentsMap& parentsMap, TVector<TString> & extraColumns) {
auto flatMapLambda = flatMap.Lambda();
if (!NYql::IsFilterFlatMap(flatMapLambda)) {
return false;
}

const auto & flatMapLambdaArgument = flatMapLambda.Args().Arg(0).Ref();
auto flatMapLambdaConditional = flatMapLambda.Body().Cast<NYql::NNodes::TCoConditionalValueBase>();

TSet<TString> lambdaSubset;
auto isSubSet = HaveFieldsSubset(flatMapLambdaConditional.Predicate().Ptr(), flatMapLambdaArgument, lambdaSubset, parentsMap, true);
auto argType = NYql::RemoveOptionalType(flatMapLambdaArgument.GetTypeAnn());
if (argType->GetKind() != NYql::ETypeAnnotationKind::Struct) {
return false;
}
// helper doesn't accept if all columns are used
if (!isSubSet && lambdaSubset.size() != argType->Cast<NYql::TStructExprType>()->GetSize()) {
return false;
}

for (auto & lambdaColumn : lambdaSubset) {
auto columnIndex = tableDesc.GetKeyColumnIndex(lambdaColumn);
if (!columnIndex) {
return false;
}
}

extraColumns.insert(extraColumns.end(), lambdaSubset.begin(), lambdaSubset.end());
return true;
}

}

namespace NKikimr::NKqp::NOpt {

using namespace NYql;
using namespace NYql::NNodes;

TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx) {
TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap) {
if (!node.Maybe<TKqlDeleteRows>()) {
return node;
}

auto deleteRows = node.Cast<TKqlDeleteRows>();

TMaybeNode<TCoFlatMap> filter;

TMaybeNode<TKqlLookupTableBase> lookup;
TMaybeNode<TKqlReadTable> read;
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
TMaybeNode<TKqlReadTableRanges> readranges;

if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
} else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
read = deleteRows.Input().Cast<TKqlReadTable>();
} else {
return node;
}

YQL_ENSURE(lookup);
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
return node;
TMaybeNode<TExprBase> input = deleteRows.Input();
if (input.Maybe<TCoFlatMap>()) {
filter = deleteRows.Input().Cast<TCoFlatMap>();
input = filter.Input();
}
readranges = input.Maybe<TKqlReadTableRanges>();
if (!readranges) {
return node;
}
}

auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
YQL_ENSURE(lookupKeyType);

// Only consider complete PK lookups
TMaybeNode<TExprBase> deleteInput;
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path());
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

TExprBase deleteInput = lookup.Cast().LookupKeys();
if (skipNulMembers) {
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
.Input(deleteInput)
.Members(skipNulMembers.Cast().Members())
if (lookup) {
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
return node;
}

auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
YQL_ENSURE(lookupKeyType);

// Only consider complete PK lookups
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

deleteInput = lookup.Cast().LookupKeys();
if (skipNulMembers) {
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
.Input(deleteInput.Cast())
.Members(skipNulMembers.Cast().Members())
.Done();
}
} else if (read) {
if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) {
return node;
}

const auto& rangeFrom = read.Cast().Range().From();
const auto& rangeTo = read.Cast().Range().To();

if (!rangeFrom.Maybe<TKqlKeyInc>() || !rangeTo.Maybe<TKqlKeyInc>()) {
return node;
}

if (rangeFrom.Raw() != rangeTo.Raw()) {
return node;
}

if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

TVector<TExprBase> structMembers;
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i]));

auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
.Name(columnNameAtom)
.Value(rangeFrom.Arg(i))
.Done();

structMembers.push_back(member);
}

deleteInput = Build<TCoAsList>(ctx, node.Pos())
.Add<TCoAsStruct>()
.Add(structMembers)
.Build()
.Done();
} else if (readranges) {
if (deleteRows.Table().Raw() != readranges.Cast().Table().Raw()) {
return node;
}

if (!readranges.Cast().PrefixPointsExpr()) {
return node;
}

const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readranges.Cast().Table().Path().Value());
auto hint = TKqpReadTableExplainPrompt::Parse(readranges.Cast().ExplainPrompt());
if (hint.PointPrefixLen != tableDesc.Metadata->KeyColumnNames.size()) {
return node;
}

if (filter) {
TVector<TString> extraColumns;
if (!CanPushFlatMap(filter.Cast(), tableDesc, parentsMap, extraColumns)) {
return node;
}
deleteInput = Build<TCoFlatMap>(ctx, node.Pos())
.Lambda(filter.Lambda().Cast())
.Input(readranges.PrefixPointsExpr().Cast())
.Done();
} else {
deleteInput = readranges.PrefixPointsExpr();
}
}

YQL_ENSURE(deleteInput);

return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
.Table(deleteRows.Table())
.Input(deleteInput)
.Input(deleteInput.Cast())
.Done();
}

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.Done();
}

if (auto readRange = node.Maybe<TKqlReadTableRanges>()) {
return Build<TKqlReadTableRanges>(ctx, read.Pos())
.Table(read.Table())
.Ranges(read.Ranges())
.Columns(usedColumns.Cast())
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.PrefixPointsExpr(readRange.PrefixPointsExpr())
.Done();
}

return Build<TKqlReadTableRangesBase>(ctx, read.Pos())
.CallableName(read.CallableName())
.Table(read.Table())
Expand Down
56 changes: 53 additions & 3 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,59 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
lookupKeys = skipNullMembers.Input();
}

TKqpReadTableSettings settings;
if (skipNullMembers) {
auto skipNullColumns = skipNullMembers.Cast().Members();

if (skipNullColumns) {
for (const auto &column : skipNullColumns.Cast()) {
settings.AddSkipNullKey(TString(column.Value()));
}
}
}

const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
if (auto lookupKeysFlatMap = lookupKeys.Maybe<TCoFlatMapBase>()) {
auto flatMapRangeInput = lookupKeysFlatMap.Cast().Input().Maybe<TCoRangeFinalize>();

// This rule should depend on feature flag for safety
if (!flatMapRangeInput || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
return {};
}

auto lookupKeysType = lookupKeys.Ref().GetTypeAnn();
YQL_ENSURE(lookupKeysType);
YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List);
auto itemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct);
auto structType = itemType->Cast<TStructExprType>();

TVector<TString> usedColumns;
usedColumns.reserve(structType->GetSize());
for (const auto& keyColumnName : table.Metadata->KeyColumnNames) {
if (!structType->FindItem(keyColumnName)) {
break;
}

usedColumns.emplace_back(keyColumnName);
}

YQL_ENSURE(usedColumns.size() == structType->GetSize());

TKqpReadTableExplainPrompt prompt;
prompt.SetUsedKeyColumns(std::move(usedColumns));
prompt.SetPointPrefixLen(structType->GetSize());


return Build<TKqlReadTableRanges>(ctx, lookup.Pos())
.Table(lookup.Table())
.Ranges(flatMapRangeInput.Cast())
.Columns(lookup.Columns())
.Settings(settings.BuildNode(ctx, lookup.Pos()))
.ExplainPrompt(prompt.BuildNode(ctx, lookup.Pos()))
.Done();
}

auto maybeAsList = lookupKeys.Maybe<TCoAsList>();
if (!maybeAsList) {
return {};
Expand All @@ -369,7 +422,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
}

// full pk expected
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) {
return {};
}
Expand All @@ -380,7 +432,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()});
}

TKqpReadTableSettings settings;
TVector<TExprBase> keyValues;
keyValues.reserve(maybeStruct.Cast().ArgCount());
for (const auto& name : table.Metadata->KeyColumnNames) {
Expand All @@ -396,7 +447,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
for (const auto &column : skipNullColumns.Cast()) {
settings.AddSkipNullKey(TString(column.Value()));
}

}
}

Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
.Done();
}
}
} else if (buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size()) {
YQL_ENSURE(prefixPointsExpr);
residualLambda = pointsExtractionResult.PrunedLambda;
buildLookup(prefixPointsExpr, input);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ NYql::NNodes::TExprBase KqpRewriteTakeOverIndexRead(const NYql::NNodes::TExprBas
const TKqpOptimizeContext& kqpCtx, const NYql::TParentsMap& parentsMap);

NYql::NNodes::TExprBase KqpDeleteOverLookup(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
const TKqpOptimizeContext &kqpCtx);
const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap);

NYql::NNodes::TExprBase KqpExcessUpsertInputColumns(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx);

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
if (intersection == 0) {
newShard->AddPoint(std::move(points[pointIndex]));
CA_LOG_D("Add point to new shardId: " << partition.ShardId);
}
if (intersection < 0) {
} else {
YQL_ENSURE(intersection > 0, "Missed intersection of point and partition ranges.");
break;
}
pointIndex += 1;
Expand Down
Loading