Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
"Name": "TKqlInsertOnConflictUpdateRows",
"Base": "TKqlUpsertRowsBase",
"Match": {"Type": "Callable", "Name": "KqlInsertOnConflictUpdateRows"},
"Children": [
{"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "GenerateColumnsIfInsert", "Type": "TCoAtomList"},
{"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
"Name": "TKqlUpsertRowsIndex",
"Base": "TKqlUpsertRowsBase",
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,12 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
itemType = input->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType();
isStream = true;
} else {
YQL_ENSURE(TKqlUpsertRows::Match(node.Get()) || TKqlUpsertRowsIndex::Match(node.Get()));

YQL_ENSURE(
TKqlUpsertRows::Match(node.Get()) ||
TKqlUpsertRowsIndex::Match(node.Get()) ||
TKqlInsertOnConflictUpdateRows::Match(node.Get())
);

if (!EnsureListType(*input, ctx)) {
return TStatus::Error;
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,21 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC
const TCoAtomList& autoincrement,
const TKikimrTableDescription& table, TExprContext& ctx)
{
auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert");
YQL_ENSURE(generateColumnsIfInsertNode);
TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast<TCoAtomList>();

const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) {
return Build<TKqlInsertOnConflictUpdateRows>(ctx, write.Pos())
.Table(BuildTableMeta(table, write.Pos(), ctx))
.Input(input.Ptr())
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.GenerateColumnsIfInsert(generateColumnsIfInsert)
.Done();
}

auto effect = Build<TKqlUpsertRows>(ctx, write.Pos())
.Table(BuildTableMeta(table, write.Pos(), ctx))
.Input(input.Ptr())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE
NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

NYql::NNodes::TExprBase KqpBuildUpdateStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);

Expand Down
235 changes: 235 additions & 0 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
#include "kqp_opt_phy_effects_rules.h"
#include "kqp_opt_phy_effects_impl.h"

using namespace NYql;
using namespace NYql::NNodes;

namespace NKikimr::NKqp::NOpt {

TMaybeNode<TDqPhyPrecompute> PrecomputeCurrentDefaultsForKeys(const TDqPhyPrecompute& lookupKeys,
const TCoAtomList& columnsWithDefault,
const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx)
{
TVector<TExprBase> lookupColumns;

for(const auto& key: table.Metadata->KeyColumnNames) {
auto atom = Build<TCoAtom>(ctx, pos)
.Value(key)
.Done();

lookupColumns.emplace_back(std::move(atom));
}

for(const auto& atom: columnsWithDefault) {
lookupColumns.push_back(atom);
}

auto lookupColumnsList = Build<TCoAtomList>(ctx, pos)
.Add(lookupColumns)
.Done();

auto lookupStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Build()
.Program()
.Args({"keys_list"})
.Body<TKqpLookupTable>()
.Table(BuildTableMeta(table, pos, ctx))
.LookupKeys<TCoIterator>()
.List("keys_list")
.Build()
.Columns(lookupColumnsList)
.Build()
.Build()
.Settings().Build()
.Done();

auto lookup = Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage(lookupStage)
.Index().Build("0")
.Build()
.Done();

auto lookupPayloadSelector = MakeRowsPayloadSelector(lookupColumnsList, table, lookupKeys.Pos(), ctx);
auto condenseLookupResult = CondenseInputToDictByPk(lookup, table, lookupPayloadSelector, ctx);
if (!condenseLookupResult) {
return {};
}

auto computeDictStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(condenseLookupResult->StageInputs)
.Build()
.Program()
.Args(condenseLookupResult->StageArgs)
.Body(condenseLookupResult->Stream)
.Build()
.Settings().Build()
.Done();

return Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(computeDictStage)
.Index().Build("0")
.Build()
.Build()
.Done();
}

TCoAtomList BuildNonDefaultColumns(
const TKikimrTableDescription& table,
const TCoAtomList& allColumns,
const TCoAtomList& columnsWithDefault,
TPositionHandle pos, TExprContext& ctx)
{
TVector<TExprBase> columnsToUpdateSet;
std::unordered_set<TString> unchangedColumns;

for(const auto& column: columnsWithDefault) {
unchangedColumns.emplace(TString(column));
}

for(const TString& key: table.Metadata->KeyColumnNames) {
unchangedColumns.emplace(key);
}

for (const auto& column : allColumns) {
auto colName = TString(column);
auto it = unchangedColumns.find(colName);
if (it != unchangedColumns.end()) {
continue;
}

auto atom = Build<TCoAtom>(ctx, pos)
.Value(colName)
.Done();

columnsToUpdateSet.emplace_back(std::move(atom));
}

return Build<TCoAtomList>(ctx, pos)
.Add(columnsToUpdateSet)
.Done();
}

TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>();
if (!maybeInsertOnConlictUpdate) {
return node;
}

auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast();
YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0);
TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert();

auto input = insertOnConlictUpdate.Input();
auto pos = insertOnConlictUpdate.Input().Pos();

const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path());

auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);
auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx);
if (!lookupDict) {
return node;
}

auto nonDefaultColumns = BuildNonDefaultColumns(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, pos, ctx);

auto inputKeysArg = TCoArgument(ctx.NewArgument(pos, "input_keys"));
auto inputDictArg = TCoArgument(ctx.NewArgument(pos, "input_dict"));
auto inputKeyArg = TCoArgument(ctx.NewArgument(pos, "input_key"));
auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "lookup_dict"));
auto presetHandlerPayload = TCoArgument(ctx.NewArgument(pos, "payload"));

auto filterStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputDictAndKeys.KeysPrecompute)
.Add(inputDictAndKeys.DictPrecompute)
.Add(lookupDict.Cast())
.Build()
.Program()
.Args({inputKeysArg, inputDictArg, lookupDictArg})
.Body<TCoIterator>()
.List<TCoMap>()
.Input(inputKeysArg)
.Lambda()
.Args(inputKeyArg)
.Body<TCoIfPresent>()
.Optional<TCoLookup>()
.Collection(lookupDictArg)
.Lookup(inputKeyArg)
.Build()
.PresentHandler<TCoLambda>()
.Args(presetHandlerPayload)
.Body<TCoFlattenMembers>()
.Add()
.Name().Build("")
.Value(presetHandlerPayload)
.Build()
.Add()
.Name().Build("")
.Value<TCoUnwrap>()
.Optional<TCoExtractMembers>()
.Input<TCoLookup>()
.Collection(inputDictArg)
.Lookup(inputKeyArg)
.Build()
.Members(nonDefaultColumns)
.Build()
.Build()
.Build()
.Add()
.Name().Build("")
.Value(inputKeyArg)
.Build()
.Build()
.Build()
.MissingValue<TCoFlattenMembers>()
.Add()
.Name().Build("")
.Value<TCoUnwrap>()
.Optional<TCoLookup>()
.Collection(inputDictArg)
.Lookup(inputKeyArg)
.Build()
.Build()
.Build()
.Add()
.Name().Build("")
.Value(inputKeyArg)
.Build()
.Build()
.Build()
.Build()
.Build()
.Build()
.Build()
.Settings().Build()
.Done();

auto newInput = Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage(filterStage)
.Index().Build("0")
.Build()
.Done();

return Build<TKqlUpsertRows>(ctx, insertOnConlictUpdate.Pos())
.Input(newInput.Ptr())
.Table(insertOnConlictUpdate.Table())
.Columns(insertOnConlictUpdate.Columns())
.Settings(insertOnConlictUpdate.Settings())
.ReturningColumns(insertOnConlictUpdate.ReturningColumns())
.Done();
}

} // namespace NKikimr::NKqp::NOpt
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/physical/effects/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SRCS(
kqp_opt_phy_update.cpp
kqp_opt_phy_upsert_index.cpp
kqp_opt_phy_returning.cpp
kqp_opt_phy_upsert_defaults.cpp
)

PEERDIR(
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TKqlInsertRows::Match, HNDL(BuildInsertStages));
AddHandler(0, &TKqlUpdateRows::Match, HNDL(BuildUpdateStages));
AddHandler(0, &TKqlInsertOnConflictUpdateRows::Match, HNDL(RewriteGenerateIfInsert));
AddHandler(0, &TKqlUpdateRowsIndex::Match, HNDL(BuildUpdateIndexStages));
AddHandler(0, &TKqlUpsertRowsIndex::Match, HNDL(BuildUpsertIndexStages));
AddHandler(0, &TKqlInsertRowsIndex::Match, HNDL(BuildInsertIndexStages));
Expand Down Expand Up @@ -144,6 +145,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx);
DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> BuildReadTableStage(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpBuildReadTableStage(node, ctx, KqpCtx);
DumpAppliedRule("BuildReadTableStage", node.Ptr(), output.Ptr(), ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
return SyncError();
} else if (constraint.Name().Value() == "default") {
auto columnBuild = indexBuildSettings.mutable_column_build_operation()->add_column();
columnBuild->SetColumnName(TString(constraint.Name().Value()));
columnBuild->SetColumnName(TString(columnName));
FillLiteralProto(constraint.Value().Cast<TCoDataCtor>(), *columnBuild->mutable_default_from_literal());
}
}
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
defaultConstraintColumnsSet.emplace(keyColumnName);
}

THashSet<TString> generateColumnsIfInsertColumnsSet;

for(const auto& [name, info] : table->Metadata->Columns) {
if (rowType->FindItem(name)) {
continue;
Expand All @@ -424,7 +426,15 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
continue;
}

if (defaultConstraintColumnsSet.find(name) != defaultConstraintColumnsSet.end()) {
continue;
}

if (info.IsDefaultKindDefined()) {
if (op == TYdbOperation::Upsert) {
generateColumnsIfInsertColumnsSet.emplace(name);
}

defaultConstraintColumnsSet.emplace(name);
}
}
Expand Down Expand Up @@ -485,6 +495,11 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
defaultConstraintColumns.push_back(ctx.NewAtom(node.Pos(), generatedColumn));
}

TExprNode::TListType generateColumnsIfInsert;
for(auto& generatedColumn: generateColumnsIfInsertColumnsSet) {
generateColumnsIfInsert.push_back(ctx.NewAtom(node.Pos(), generatedColumn));
}

node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build<TCoNameValueTupleList>(ctx, node.Pos())
.Add(node.Settings())
.Add()
Expand All @@ -499,6 +514,12 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
.Add(defaultConstraintColumns)
.Build()
.Build()
.Add()
.Name().Build("generate_columns_if_insert")
.Value<TCoAtomList>()
.Add(generateColumnsIfInsert)
.Build()
.Build()
.Done()
.Ptr();

Expand Down
Loading