Skip to content

Commit 2b89c7d

Browse files
dcherednikqrort
andauthored
Allow partial column set in case of insert (#1056)
* Allow partial column set in case of insert into table with unique constraint. Co-authored-by: qrort <31865255+qrort@users.noreply.github.com>
1 parent 39325bd commit 2b89c7d

File tree

9 files changed

+275
-37
lines changed

9 files changed

+275
-37
lines changed

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompu
6969
const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
7070

7171
NYql::NNodes::TMaybeNode<NYql::NNodes::TDqCnUnionAll> MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input,
72-
const NYql::TKikimrTableDescription& table, bool abortOnError, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
72+
const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumn, bool abortOnError,
73+
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
7374

7475
enum class TKqpPhyUpsertIndexMode {
7576
Upsert,

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ using namespace NYql::NDq;
99
using namespace NYql::NNodes;
1010

1111
TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, const TKikimrTableDescription& table,
12-
bool abortOnError, TPositionHandle pos, TExprContext& ctx)
12+
const TMaybe<THashSet<TStringBuf>>& inputColumns, bool abortOnError, TPositionHandle pos, TExprContext& ctx)
1313
{
1414
auto condenseResult = CondenseInput(input, ctx);
1515
if (!condenseResult) {
1616
return {};
1717
}
1818

19-
auto helper = CreateInsertUniqBuildHelper(table, pos, ctx);
19+
auto helper = CreateInsertUniqBuildHelper(table, inputColumns, pos, ctx);
2020
auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
2121

2222
auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx);
@@ -127,7 +127,8 @@ TExprBase KqpBuildInsertStages(TExprBase node, TExprContext& ctx, const TKqpOpti
127127
bool abortOnError = insert.OnConflict().Value() == "abort"sv;
128128
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insert.Table().Path());
129129

130-
auto insertRows = MakeConditionalInsertRows(insert.Input(), table, abortOnError, insert.Pos(), ctx);
130+
const static TMaybe<THashSet<TStringBuf>> empty;
131+
auto insertRows = MakeConditionalInsertRows(insert.Input(), table, empty, abortOnError, insert.Pos(), ctx);
131132
if (!insertRows) {
132133
return node;
133134
}

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
8585
bool abortOnError = insert.OnConflict().Value() == "abort"sv;
8686
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insert.Table().Path());
8787

88-
auto insertRows = MakeConditionalInsertRows(insert.Input(), table, abortOnError, insert.Pos(), ctx);
88+
THashSet<TStringBuf> inputColumnsSet;
89+
for (const auto& column : insert.Columns()) {
90+
inputColumnsSet.emplace(column.Value());
91+
}
92+
93+
auto insertRows = MakeConditionalInsertRows(insert.Input(), table, inputColumnsSet, abortOnError, insert.Pos(), ctx);
8994
if (!insertRows) {
9095
return node;
9196
}
@@ -94,11 +99,6 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
9499
.Connection(insertRows.Cast())
95100
.Done();
96101

97-
THashSet<TStringBuf> inputColumnsSet;
98-
for (const auto& column : insert.Columns()) {
99-
inputColumnsSet.emplace(column.Value());
100-
}
101-
102102
auto indexes = BuildSecondaryIndexVector(table, insert.Pos(), ctx);
103103
YQL_ENSURE(indexes);
104104

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector,
3939

4040
class TInsertUniqBuildHelper : public TUniqBuildHelper {
4141
public:
42-
TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
43-
NYql::TExprContext& ctx)
44-
: TUniqBuildHelper(table, nullptr, nullptr, pos, ctx, false)
42+
TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
43+
NYql::TPositionHandle pos, NYql::TExprContext& ctx)
44+
: TUniqBuildHelper(table, inputColumns, nullptr, pos, ctx, true)
4545
{}
4646

4747
private:
@@ -119,9 +119,9 @@ class TInsertUniqBuildHelper : public TUniqBuildHelper {
119119

120120
class TUpsertUniqBuildHelper : public TUniqBuildHelper {
121121
public:
122-
TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes,
122+
TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns, const THashSet<TString>& usedIndexes,
123123
NYql::TPositionHandle pos, NYql::TExprContext& ctx)
124-
: TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, true)
124+
: TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, false)
125125
, PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx))
126126
{}
127127

@@ -243,12 +243,12 @@ class TUpsertUniqBuildHelper : public TUniqBuildHelper {
243243
}
244244

245245
TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
246-
const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns,
247-
const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
246+
const TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
247+
const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool insertMode)
248248
{
249249
TVector<TUniqCheckNodes> checks;
250250

251-
if (!skipPkCheck) {
251+
if (insertMode) {
252252
checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx));
253253
}
254254

@@ -262,19 +262,34 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
262262
continue;
263263

264264
// Compatibility with PG semantic - allow multiple null in columns with unique constaint
265+
// NOTE: to change this it's important to consider insert and replace in case of partial column set
265266
TVector<TCoAtom> skipNullColumns;
266267
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());
267268

268269
bool used = false;
270+
bool skip = false;
271+
272+
YQL_ENSURE(inputColumns, "Attempt to check uniq constraint without given columns");
273+
269274
for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
270-
used |= (!inputColumns || inputColumns->contains(column));
275+
if (inputColumns->contains(column)) {
276+
// Skip check if no input for index update
277+
used = true;
278+
} else if (insertMode) {
279+
// In case of insert, 'column' will contain NULL for the new PK (or query will fail in case of NOT NULL)
280+
// NULL != NULL and NULL != "any other value" so we can just skip uniq check.
281+
skip = true;
282+
continue;
283+
}
271284
TCoAtom atom(ctx.NewAtom(pos, column));
272285
skipNullColumns.emplace_back(atom);
273286
}
274287

275-
// Just to doublecheck we are not trying to update index without data to update
276-
YQL_ENSURE(used, "Index is used but not input columns for update. Probably it's a bug."
277-
" Index: " << table.Metadata->Indexes[i].Name);
288+
if (!used)
289+
continue;
290+
291+
if (skip)
292+
continue;
278293

279294
auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
280295
.Input(rowsListArg)
@@ -292,7 +307,7 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
292307
return checks;
293308
}
294309

295-
static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const THashSet<TStringBuf>* inputColumns,
310+
static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const TMaybe<THashSet<TStringBuf>>& inputColumns,
296311
TPositionHandle pos, TExprContext& ctx)
297312
{
298313
if (!inputColumns) {
@@ -326,11 +341,11 @@ static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const TH
326341
.Done().Ptr();
327342
}
328343

329-
TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
330-
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
344+
TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns, const THashSet<TString>* usedIndexes,
345+
TPositionHandle pos, TExprContext& ctx, bool insertMode)
331346
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
332347
, False(MakeBool(pos, false, ctx))
333-
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
348+
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, insertMode))
334349
, RowsToPass(CreateRowsToPass(RowsListArg, inputColumns, pos, ctx))
335350
{}
336351

@@ -548,14 +563,15 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta
548563
namespace NKikimr::NKqp::NOpt {
549564

550565

551-
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
552-
NYql::TExprContext& ctx)
566+
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
567+
const TMaybe<THashSet<TStringBuf>>& inputColumns, NYql::TPositionHandle pos,
568+
NYql::TExprContext& ctx)
553569
{
554-
return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx);
570+
return std::make_unique<TInsertUniqBuildHelper>(table, inputColumns, pos, ctx);
555571
}
556572

557573
TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
558-
const THashSet<TStringBuf>* inputColumns,
574+
const TMaybe<THashSet<TStringBuf>>& inputColumns,
559575
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx)
560576
{
561577
return std::make_unique<TUpsertUniqBuildHelper>(table, inputColumns, usedIndexes, pos, ctx);

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ class TUniqBuildHelper {
2727
virtual ~TUniqBuildHelper() = default;
2828
protected:
2929
// table - metadata of table
30-
// skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict
31-
TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
32-
NYql::TExprContext& ctx, bool skipPkCheck);
30+
TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
31+
const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool insertMode);
3332
size_t CalcComputeKeysStageOutputNum() const;
3433

3534
struct TUniqCheckNodes {
@@ -63,7 +62,8 @@ class TUniqBuildHelper {
6362
static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector,
6463
const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
6564
static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
66-
const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
65+
const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
66+
const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
6767
NYql::TExprContext& ctx, bool skipPkCheck);
6868

6969
virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage,
@@ -80,9 +80,9 @@ class TUniqBuildHelper {
8080
};
8181

8282
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
83-
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
83+
const TMaybe<THashSet<TStringBuf>>& inputColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
8484

8585
TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
86-
const THashSet<TStringBuf>* inputColumns,
86+
const TMaybe<THashSet<TStringBuf>>& inputColumns,
8787
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
8888
}

ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow
510510
YQL_ENSURE(condenseResult);
511511
}
512512

513-
auto helper = CreateUpsertUniqBuildHelper(table, &inputColumns, usedIndexes, pos, ctx);
513+
auto helper = CreateUpsertUniqBuildHelper(table, inputColumns, usedIndexes, pos, ctx);
514514
if (helper->GetChecksNum() == 0) {
515515
return condenseResult;
516516
}

ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,54 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) {
190190
}
191191
}
192192

193+
Y_UNIT_TEST(InsertFkPartialColumnSet) {
194+
TKikimrRunner kikimr(SyntaxV1Settings());
195+
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
196+
auto db = kikimr.GetTableClient();
197+
auto session = db.CreateSession().GetValueSync().GetSession();
198+
FillTable(session);
199+
200+
{
201+
const TString query(Q_(R"(
202+
INSERT INTO `/Root/MultiShardIndexed` (key, value) VALUES
203+
(1173915, "v1");
204+
)"));
205+
206+
auto result = ExecuteDataQuery(session, query);
207+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
208+
}
209+
210+
{
211+
const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable");
212+
const TString expected = R"([[#;[1173915u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])";
213+
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
214+
}
215+
}
216+
217+
Y_UNIT_TEST(ReplaceFkPartialColumnSet) {
218+
TKikimrRunner kikimr(SyntaxV1Settings());
219+
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
220+
auto db = kikimr.GetTableClient();
221+
auto session = db.CreateSession().GetValueSync().GetSession();
222+
FillTable(session);
223+
224+
{
225+
const TString query(Q_(R"(
226+
REPLACE INTO `/Root/MultiShardIndexed` (key, value) VALUES
227+
(1173915, "v1");
228+
)"));
229+
230+
auto result = ExecuteDataQuery(session, query);
231+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
232+
}
233+
234+
{
235+
const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable");
236+
const TString expected = R"([[#;[1173915u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])";
237+
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
238+
}
239+
}
240+
193241
Y_UNIT_TEST(ReplaceFkAlreadyExist) {
194242
TKikimrRunner kikimr(SyntaxV1Settings());
195243
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,45 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
737737
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
738738
}
739739

740+
{
741+
// Insert - do nothing
742+
const TString query2 = Q1_(R"(
743+
INSERT INTO `/Root/TestTable` (k1, k2, fk1) VALUES
744+
("p1str5", "p2str3", "fk1_str");
745+
)");
746+
747+
auto result = session.ExecuteDataQuery(
748+
query2,
749+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
750+
.ExtractValueSync();
751+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
752+
}
753+
754+
{
755+
const TString query2 = Q1_(R"(
756+
UPDATE `/Root/TestTable` ON (k1, k2, fk1) VALUES
757+
("p1str5", "p2str3", "fk1_str");
758+
)");
759+
760+
auto result = session.ExecuteDataQuery(
761+
query2,
762+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
763+
.ExtractValueSync();
764+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
765+
}
766+
767+
{
768+
const TString query2 = Q1_(R"(
769+
UPDATE `/Root/TestTable` SET fk1 = "fk1_str"
770+
WHERE k1 = "p1str5" AND k2 = "p2str3";
771+
)");
772+
773+
auto result = session.ExecuteDataQuery(
774+
query2,
775+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
776+
.ExtractValueSync();
777+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
778+
}
740779
}
741780

742781
Y_UNIT_TEST(UpsertMultipleUniqIndexes) {

0 commit comments

Comments
 (0)