Skip to content

Commit 4366d88

Browse files
authored
Allow UPDATE ON, UPSERT with partial set of input... (#894)
* Allow UPDATE ON, UPSERT with partial set of input... columns in case of unique index. Example: table: pk, fk1, fk2. Uniq index: fk1, fk2. To perform UPSERT INTO table (pk, fk1) we need to read missed value fk2 from main table to create lookup uniq index key. Right now we have two lookups in to main table - first one has been described above - second we need to make to remove old value from index. It will be fixed in a next step. * fix
1 parent bd18c42 commit 4366d88

9 files changed

+1105
-71
lines changed

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ TMaybe<TCondenseInputResult> CondenseInput(const TExprBase& input, TExprContext&
7777
TVector<TCoArgument> stageArguments;
7878

7979
if (IsDqPureExpr(input)) {
80-
YQL_ENSURE(input.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List, "" << input.Ref().Dump());
8180
auto stream = Build<TCoToStream>(ctx, input.Pos())
8281
.Input<TCoJust>()
8382
.Input(input)

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h

+8
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,19 @@ TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const NYql::NNodes::TExprBa
3030
const NYql::TKikimrTableDescription& table, const NYql::NNodes::TCoLambda& payloadSelector,
3131
NYql::TExprContext& ctx);
3232

33+
NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
34+
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
35+
const TVector<NYql::NNodes::TExprBase>& columnsList,
36+
NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool fixLookupKeys);
37+
3338
NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDict(
3439
const NYql::NNodes::TDqPhyPrecompute& lookupKeys, const NYql::TKikimrTableDescription& table,
3540
const THashSet<TString>& dataColumns, const THashSet<TString>& keyColumns, NYql::TPositionHandle pos,
3641
NYql::TExprContext& ctx);
3742

43+
NYql::NNodes::TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
44+
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
45+
3846
// Creates key selector using PK of given table
3947
NYql::NNodes::TCoLambda MakeTableKeySelector(const NYql::TKikimrTableMetadataPtr tableMeta, NYql::TPositionHandle pos,
4048
NYql::TExprContext& ctx, TMaybe<int> tupleId = {});

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp

+80-13
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ TVector<TExprBase> CreateColumnsToSelectToUpdateIndex(
4848
return columnsToSelect;
4949
}
5050

51-
TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) {
51+
} // namespace
52+
53+
TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& condenseResult,
54+
TPositionHandle pos, TExprContext& ctx)
55+
{
5256
auto computeDictStage = Build<TDqStage>(ctx, pos)
5357
.Inputs()
5458
.Add(condenseResult.StageInputs)
@@ -70,8 +74,6 @@ TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPos
7074
.Done();
7175
}
7276

73-
} // namespace
74-
7577
TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector(
7678
const TKikimrTableDescription& table,
7779
TPositionHandle pos,
@@ -127,26 +129,81 @@ TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table
127129
}
128130

129131
TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
130-
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
131-
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
132+
const TKikimrTableDescription& table, const TVector<TExprBase>& columnsList,
133+
TPositionHandle pos, TExprContext& ctx, bool fixLookupKeys)
132134
{
133-
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
134-
keyColumns, pos, ctx);
135-
136135
auto lookupColumnsList = Build<TCoAtomList>(ctx, pos)
137-
.Add(lookupColumns)
136+
.Add(columnsList)
138137
.Done();
139138

139+
TExprNode::TPtr keys;
140+
141+
// we need to left only table key columns to perform lookup
142+
// unfortunately we can't do it inside lookup stage
143+
if (fixLookupKeys) {
144+
auto keyArg = TCoArgument(ctx.NewArgument(pos, "key"));
145+
auto keysList = TCoArgument(ctx.NewArgument(pos, "keys_list"));
146+
TVector<TExprBase> keyLookupTuples;
147+
keyLookupTuples.reserve(table.Metadata->KeyColumnNames.size());
148+
149+
for (const auto& key : table.Metadata->KeyColumnNames) {
150+
keyLookupTuples.emplace_back(
151+
Build<TCoNameValueTuple>(ctx, pos)
152+
.Name().Build(key)
153+
.Value<TCoMember>()
154+
.Struct(keyArg)
155+
.Name().Build(key)
156+
.Build()
157+
.Done());
158+
}
159+
160+
auto list = Build<TCoToStream>(ctx, pos)
161+
.Input<TCoJust>()
162+
.Input<TCoMap>()
163+
.Input(keysList)
164+
.Lambda()
165+
.Args({keyArg})
166+
.Body<TCoAsStruct>()
167+
.Add(keyLookupTuples)
168+
.Build()
169+
.Build()
170+
.Build()
171+
.Build()
172+
.Done().Ptr();
173+
174+
keys = Build<TDqStage>(ctx, pos)
175+
.Inputs()
176+
.Add(lookupKeys)
177+
.Build()
178+
.Program()
179+
.Args({keysList})
180+
.Body(list)
181+
.Build()
182+
.Settings().Build()
183+
.Done().Ptr();
184+
185+
keys = Build<TDqPhyPrecompute>(ctx, pos)
186+
.Connection<TDqCnValue>()
187+
.Output()
188+
.Stage(keys)
189+
.Index().Build("0")
190+
.Build()
191+
.Build()
192+
.Done().Ptr();
193+
} else {
194+
keys = lookupKeys.Ptr();
195+
}
196+
140197
auto lookupStage = Build<TDqStage>(ctx, pos)
141198
.Inputs()
142-
.Add(lookupKeys)
199+
.Add(keys)
143200
.Build()
144201
.Program()
145-
.Args({"keys_list"})
202+
.Args({"keys_stage_arg"})
146203
.Body<TKqpLookupTable>()
147204
.Table(BuildTableMeta(table, pos, ctx))
148205
.LookupKeys<TCoIterator>()
149-
.List("keys_list")
206+
.List("keys_stage_arg")
150207
.Build()
151208
.Columns(lookupColumnsList)
152209
.Build()
@@ -167,7 +224,17 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& l
167224
return {};
168225
}
169226

170-
return PrecomputeDict(*condenseLookupResult, lookupKeys.Pos(), ctx);
227+
return PrecomputeCondenseInputResult(*condenseLookupResult, lookupKeys.Pos(), ctx);
228+
}
229+
230+
TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
231+
const TKikimrTableDescription& table, const THashSet<TString>& dataColumns,
232+
const THashSet<TString>& keyColumns, TPositionHandle pos, TExprContext& ctx)
233+
{
234+
auto lookupColumns = CreateColumnsToSelectToUpdateIndex(table.Metadata->KeyColumnNames, dataColumns,
235+
keyColumns, pos, ctx);
236+
237+
return PrecomputeTableLookupDict(lookupKeys, table, lookupColumns, pos, ctx, false);
171238
}
172239

173240
TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp

+45-10
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,17 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
264264
// Compatibility with PG semantic - allow multiple null in columns with unique constaint
265265
TVector<TCoAtom> skipNullColumns;
266266
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());
267+
268+
bool used = false;
267269
for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
268-
if (!inputColumns || inputColumns->contains(column)) {
269-
TCoAtom atom(ctx.NewAtom(pos, column));
270-
skipNullColumns.emplace_back(atom);
271-
}
270+
used |= (!inputColumns || inputColumns->contains(column));
271+
TCoAtom atom(ctx.NewAtom(pos, column));
272+
skipNullColumns.emplace_back(atom);
272273
}
273274

274-
//no columns to skip -> no index columns to check -> skip check
275-
if (skipNullColumns.empty()) {
276-
continue;
277-
}
275+
// Just to doublecheck we are not trying to update index without data to update
276+
YQL_ENSURE(used, "Index is used but not input columns for update. Probably it's a bug."
277+
" Index: " << table.Metadata->Indexes[i].Name);
278278

279279
auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
280280
.Input(rowsListArg)
@@ -292,11 +292,46 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
292292
return checks;
293293
}
294294

295+
static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const THashSet<TStringBuf>* inputColumns,
296+
TPositionHandle pos, TExprContext& ctx)
297+
{
298+
if (!inputColumns) {
299+
return rowsListArg.Ptr();
300+
}
301+
302+
auto arg = TCoArgument(ctx.NewArgument(pos, "arg"));
303+
304+
TVector<TExprBase> columns;
305+
columns.reserve(inputColumns->size());
306+
307+
for (const auto x : *inputColumns) {
308+
columns.emplace_back(
309+
Build<TCoNameValueTuple>(ctx, pos)
310+
.Name().Build(x)
311+
.Value<TCoMember>()
312+
.Struct(arg)
313+
.Name().Build(x)
314+
.Build()
315+
.Done());
316+
}
317+
318+
return Build<TCoMap>(ctx, pos)
319+
.Input(rowsListArg)
320+
.Lambda()
321+
.Args({arg})
322+
.Body<TCoAsStruct>()
323+
.Add(columns)
324+
.Build()
325+
.Build()
326+
.Done().Ptr();
327+
}
328+
295329
TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
296330
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
297331
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
298332
, False(MakeBool(pos, false, ctx))
299333
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
334+
, RowsToPass(CreateRowsToPass(RowsListArg, inputColumns, pos, ctx))
300335
{}
301336

302337
TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector,
@@ -340,7 +375,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co
340375

341376
types.emplace_back(
342377
Build<TCoTypeOf>(ctx, pos)
343-
.Value(RowsListArg)
378+
.Value(RowsToPass)
344379
.Done()
345380
);
346381

@@ -376,7 +411,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co
376411

377412
variants.emplace_back(
378413
Build<TCoVariant>(ctx, pos)
379-
.Item(RowsListArg)
414+
.Item(RowsToPass)
380415
.Index().Build("0")
381416
.VarType(variantType)
382417
.Done()

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class TUniqBuildHelper {
7676
const NYql::TExprNode::TPtr False;
7777
private:
7878
const TChecks Checks;
79+
const NYql::TExprNode::TPtr RowsToPass;
7980
};
8081

8182
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,

0 commit comments

Comments
 (0)