Skip to content

Allow UPDATE ON, UPSERT with partial set of input... #894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ TMaybe<TCondenseInputResult> CondenseInput(const TExprBase& input, TExprContext&
TVector<TCoArgument> stageArguments;

if (IsDqPureExpr(input)) {
YQL_ENSURE(input.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List, "" << input.Ref().Dump());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что там если не List?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Там про использовании CondenseInput из сложных оптимайзеров, еще type анотация не прошла. Оно просто тут сегфолтится

auto stream = Build<TCoToStream>(ctx, input.Pos())
.Input<TCoJust>()
.Input(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const NYql::NNodes::TExprBa
const NYql::TKikimrTableDescription& table, const NYql::NNodes::TCoLambda& payloadSelector,
NYql::TExprContext& ctx);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
const TVector<NYql::NNodes::TExprBase>& columnsList,
NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool fixLookupKeys);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
const THashSet<TString>& dataColumns, const THashSet<TString>& keyColumns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);

NYql::NNodes::TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);

// Creates key selector using PK of given table
NYql::NNodes::TCoLambda MakeTableKeySelector(const NYql::TKikimrTableMetadataPtr tableMeta, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, TMaybe<int> tupleId = {});
Expand Down
93 changes: 80 additions & 13 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ TVector<TExprBase> CreateColumnsToSelectToUpdateIndex(
return columnsToSelect;
}

TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) {
} // namespace

TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
TPositionHandle pos, TExprContext& ctx)
{
auto computeDictStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(condenseResult.StageInputs)
Expand All @@ -70,8 +74,6 @@ TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPos
.Done();
}

} // namespace

TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector(
const TKikimrTableDescription& table,
TPositionHandle pos,
Expand Down Expand Up @@ -127,26 +129,81 @@ TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table
}

TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
const TKikimrTableDescription& table, const TVector<TExprBase>& columnsList,
TPositionHandle pos, TExprContext& ctx, bool fixLookupKeys)
{
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
keyColumns, pos, ctx);

auto lookupColumnsList = Build<TCoAtomList>(ctx, pos)
.Add(lookupColumns)
.Add(columnsList)
.Done();

TExprNode::TPtr keys;

// we need to left only table key columns to perform lookup
// unfortunately we can't do it inside lookup stage
if (fixLookupKeys) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Добавь плз комментарий, что значит fixLookupKeys, но очевидно.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ага. Продублирую тут
// we need to left only table key columns to perform lookup
// unfortunately we can't do it inside lookup stage

auto keyArg = TCoArgument(ctx.NewArgument(pos, "key"));
auto keysList = TCoArgument(ctx.NewArgument(pos, "keys_list"));
TVector<TExprBase> keyLookupTuples;
keyLookupTuples.reserve(table.Metadata->KeyColumnNames.size());

for (const auto& key : table.Metadata->KeyColumnNames) {
keyLookupTuples.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(key)
.Value<TCoMember>()
.Struct(keyArg)
.Name().Build(key)
.Build()
.Done());
}

auto list = Build<TCoToStream>(ctx, pos)
.Input<TCoJust>()
.Input<TCoMap>()
.Input(keysList)
.Lambda()
.Args({keyArg})
.Body<TCoAsStruct>()
.Add(keyLookupTuples)
.Build()
.Build()
.Build()
.Build()
.Done().Ptr();

keys = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Build()
.Program()
.Args({keysList})
.Body(list)
.Build()
.Settings().Build()
.Done().Ptr();

keys = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(keys)
.Index().Build("0")
.Build()
.Build()
.Done().Ptr();
} else {
keys = lookupKeys.Ptr();
}

auto lookupStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Add(keys)
.Build()
.Program()
.Args({"keys_list"})
.Args({"keys_stage_arg"})
.Body<TKqpLookupTable>()
.Table(BuildTableMeta(table, pos, ctx))
.LookupKeys<TCoIterator>()
.List("keys_list")
.List("keys_stage_arg")
.Build()
.Columns(lookupColumnsList)
.Build()
Expand All @@ -167,7 +224,17 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& l
return {};
}

return PrecomputeDict(*condenseLookupResult, lookupKeys.Pos(), ctx);
return PrecomputeCondenseInputResult(*condenseLookupResult, lookupKeys.Pos(), ctx);
}

TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
{
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
keyColumns, pos, ctx);

return PrecomputeTableLookupDict(lookupKeys, table, lookupColumns, pos, ctx, false);
}

TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
Expand Down
55 changes: 45 additions & 10 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,17 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
// Compatibility with PG semantic - allow multiple null in columns with unique constaint
TVector<TCoAtom> skipNullColumns;
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());

bool used = false;
for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
if (!inputColumns || inputColumns->contains(column)) {
TCoAtom atom(ctx.NewAtom(pos, column));
skipNullColumns.emplace_back(atom);
}
used |= (!inputColumns || inputColumns->contains(column));
TCoAtom atom(ctx.NewAtom(pos, column));
skipNullColumns.emplace_back(atom);
}

//no columns to skip -> no index columns to check -> skip check
if (skipNullColumns.empty()) {
continue;
}
// Just to doublecheck we are not trying to update index without data to update
YQL_ENSURE(used, "Index is used but not input columns for update. Probably it's a bug."
" Index: " << table.Metadata->Indexes[i].Name);

auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
.Input(rowsListArg)
Expand All @@ -292,11 +292,46 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
return checks;
}

static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const THashSet<TStringBuf>* inputColumns,
TPositionHandle pos, TExprContext& ctx)
{
if (!inputColumns) {
return rowsListArg.Ptr();
}

auto arg = TCoArgument(ctx.NewArgument(pos, "arg"));

TVector<TExprBase> columns;
columns.reserve(inputColumns->size());

for (const auto x : *inputColumns) {
columns.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoMember>()
.Struct(arg)
.Name().Build(x)
.Build()
.Done());
}

return Build<TCoMap>(ctx, pos)
.Input(rowsListArg)
.Lambda()
.Args({arg})
.Body<TCoAsStruct>()
.Add(columns)
.Build()
.Build()
.Done().Ptr();
}

TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
, False(MakeBool(pos, false, ctx))
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
, RowsToPass(CreateRowsToPass(RowsListArg, inputColumns, pos, ctx))
{}

TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector,
Expand Down Expand Up @@ -340,7 +375,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co

types.emplace_back(
Build<TCoTypeOf>(ctx, pos)
.Value(RowsListArg)
.Value(RowsToPass)
.Done()
);

Expand Down Expand Up @@ -376,7 +411,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co

variants.emplace_back(
Build<TCoVariant>(ctx, pos)
.Item(RowsListArg)
.Item(RowsToPass)
.Index().Build("0")
.VarType(variantType)
.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TUniqBuildHelper {
const NYql::TExprNode::TPtr False;
private:
const TChecks Checks;
const NYql::TExprNode::TPtr RowsToPass;
};

TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
Expand Down
Loading