Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompu
const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqCnUnionAll> MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input,
const NYql::TKikimrTableDescription& table, bool abortOnError, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumn, bool abortOnError,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);

enum class TKqpPhyUpsertIndexMode {
Upsert,
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ using namespace NYql::NDq;
using namespace NYql::NNodes;

TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, const TKikimrTableDescription& table,
bool abortOnError, TPositionHandle pos, TExprContext& ctx)
const TMaybe<THashSet<TStringBuf>>& inputColumns, bool abortOnError, TPositionHandle pos, TExprContext& ctx)
{
auto condenseResult = CondenseInput(input, ctx);
if (!condenseResult) {
return {};
}

auto helper = CreateInsertUniqBuildHelper(table, pos, ctx);
auto helper = CreateInsertUniqBuildHelper(table, inputColumns, pos, ctx);
auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);

auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx);
Expand Down Expand Up @@ -127,7 +127,8 @@ TExprBase KqpBuildInsertStages(TExprBase node, TExprContext& ctx, const TKqpOpti
bool abortOnError = insert.OnConflict().Value() == "abort"sv;
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insert.Table().Path());

auto insertRows = MakeConditionalInsertRows(insert.Input(), table, abortOnError, insert.Pos(), ctx);
const static TMaybe<THashSet<TStringBuf>> empty;
auto insertRows = MakeConditionalInsertRows(insert.Input(), table, empty, abortOnError, insert.Pos(), ctx);
if (!insertRows) {
return node;
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
bool abortOnError = insert.OnConflict().Value() == "abort"sv;
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insert.Table().Path());

auto insertRows = MakeConditionalInsertRows(insert.Input(), table, abortOnError, insert.Pos(), ctx);
THashSet<TStringBuf> inputColumnsSet;
for (const auto& column : insert.Columns()) {
inputColumnsSet.emplace(column.Value());
}

auto insertRows = MakeConditionalInsertRows(insert.Input(), table, inputColumnsSet, abortOnError, insert.Pos(), ctx);
if (!insertRows) {
return node;
}
Expand All @@ -94,11 +99,6 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
.Connection(insertRows.Cast())
.Done();

THashSet<TStringBuf> inputColumnsSet;
for (const auto& column : insert.Columns()) {
inputColumnsSet.emplace(column.Value());
}

auto indexes = BuildSecondaryIndexVector(table, insert.Pos(), ctx);
YQL_ENSURE(indexes);

Expand Down
56 changes: 36 additions & 20 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector,

class TInsertUniqBuildHelper : public TUniqBuildHelper {
public:
TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
NYql::TExprContext& ctx)
: TUniqBuildHelper(table, nullptr, nullptr, pos, ctx, false)
TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
NYql::TPositionHandle pos, NYql::TExprContext& ctx)
: TUniqBuildHelper(table, inputColumns, nullptr, pos, ctx, true)
{}

private:
Expand Down Expand Up @@ -119,9 +119,9 @@ class TInsertUniqBuildHelper : public TUniqBuildHelper {

class TUpsertUniqBuildHelper : public TUniqBuildHelper {
public:
TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes,
TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns, const THashSet<TString>& usedIndexes,
NYql::TPositionHandle pos, NYql::TExprContext& ctx)
: TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, true)
: TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, false)
, PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx))
{}

Expand Down Expand Up @@ -243,12 +243,12 @@ class TUpsertUniqBuildHelper : public TUniqBuildHelper {
}

TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns,
const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
const TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool insertMode)
{
TVector<TUniqCheckNodes> checks;

if (!skipPkCheck) {
if (insertMode) {
checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx));
}

Expand All @@ -262,19 +262,34 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
continue;

// Compatibility with PG semantic - allow multiple null in columns with unique constaint
// NOTE: to change this it's important to consider insert and replace in case of partial column set
TVector<TCoAtom> skipNullColumns;
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());

bool used = false;
bool skip = false;

YQL_ENSURE(inputColumns, "Attempt to check uniq constraint without given columns");

for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
used |= (!inputColumns || inputColumns->contains(column));
if (inputColumns->contains(column)) {
// Skip check if no input for index update
used = true;
} else if (insertMode) {
// In case of insert, 'column' will contain NULL for the new PK (or query will fail in case of NOT NULL)
// NULL != NULL and NULL != "any other value" so we can just skip uniq check.
skip = true;
continue;
}
TCoAtom atom(ctx.NewAtom(pos, column));
skipNullColumns.emplace_back(atom);
}

// Just to doublecheck we are not trying to update index without data to update
YQL_ENSURE(used, "Index is used but not input columns for update. Probably it's a bug."
" Index: " << table.Metadata->Indexes[i].Name);
if (!used)
continue;

if (skip)
continue;

auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
.Input(rowsListArg)
Expand All @@ -292,7 +307,7 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
return checks;
}

static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const THashSet<TStringBuf>* inputColumns,
static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const TMaybe<THashSet<TStringBuf>>& inputColumns,
TPositionHandle pos, TExprContext& ctx)
{
if (!inputColumns) {
Expand Down Expand Up @@ -326,11 +341,11 @@ static TExprNode::TPtr CreateRowsToPass(const TCoArgument& rowsListArg, const TH
.Done().Ptr();
}

TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns, const THashSet<TString>* usedIndexes,
TPositionHandle pos, TExprContext& ctx, bool insertMode)
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
, False(MakeBool(pos, false, ctx))
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
, Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, insertMode))
, RowsToPass(CreateRowsToPass(RowsListArg, inputColumns, pos, ctx))
{}

Expand Down Expand Up @@ -548,14 +563,15 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta
namespace NKikimr::NKqp::NOpt {


TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
NYql::TExprContext& ctx)
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
const TMaybe<THashSet<TStringBuf>>& inputColumns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx)
{
return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx);
return std::make_unique<TInsertUniqBuildHelper>(table, inputColumns, pos, ctx);
}

TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
const THashSet<TStringBuf>* inputColumns,
const TMaybe<THashSet<TStringBuf>>& inputColumns,
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx)
{
return std::make_unique<TUpsertUniqBuildHelper>(table, inputColumns, usedIndexes, pos, ctx);
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ class TUniqBuildHelper {
virtual ~TUniqBuildHelper() = default;
protected:
// table - metadata of table
// skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict
TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, bool skipPkCheck);
TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool insertMode);
size_t CalcComputeKeysStageOutputNum() const;

struct TUniqCheckNodes {
Expand Down Expand Up @@ -63,7 +62,8 @@ class TUniqBuildHelper {
static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector,
const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumns,
const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, bool skipPkCheck);

virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage,
Expand All @@ -80,9 +80,9 @@ class TUniqBuildHelper {
};

TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
const TMaybe<THashSet<TStringBuf>>& inputColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
const THashSet<TStringBuf>* inputColumns,
const TMaybe<THashSet<TStringBuf>>& inputColumns,
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
YQL_ENSURE(condenseResult);
}

auto helper = CreateUpsertUniqBuildHelper(table, &inputColumns, usedIndexes, pos, ctx);
auto helper = CreateUpsertUniqBuildHelper(table, inputColumns, usedIndexes, pos, ctx);
if (helper->GetChecksNum() == 0) {
return condenseResult;
}
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,54 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) {
}
}

Y_UNIT_TEST(InsertFkPartialColumnSet) {
TKikimrRunner kikimr(SyntaxV1Settings());
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
FillTable(session);

{
const TString query(Q_(R"(
INSERT INTO `/Root/MultiShardIndexed` (key, value) VALUES
(1173915, "v1");
)"));

auto result = ExecuteDataQuery(session, query);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

{
const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable");
const TString expected = R"([[#;[1173915u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])";
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
}
}

Y_UNIT_TEST(ReplaceFkPartialColumnSet) {
TKikimrRunner kikimr(SyntaxV1Settings());
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
FillTable(session);

{
const TString query(Q_(R"(
REPLACE INTO `/Root/MultiShardIndexed` (key, value) VALUES
(1173915, "v1");
)"));

auto result = ExecuteDataQuery(session, query);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

{
const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable");
const TString expected = R"([[#;[1173915u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])";
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
}
}

Y_UNIT_TEST(ReplaceFkAlreadyExist) {
TKikimrRunner kikimr(SyntaxV1Settings());
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,45 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
}

{
// Insert - do nothing
const TString query2 = Q1_(R"(
INSERT INTO `/Root/TestTable` (k1, k2, fk1) VALUES
("p1str5", "p2str3", "fk1_str");
)");

auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}

{
const TString query2 = Q1_(R"(
UPDATE `/Root/TestTable` ON (k1, k2, fk1) VALUES
("p1str5", "p2str3", "fk1_str");
)");

auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}

{
const TString query2 = Q1_(R"(
UPDATE `/Root/TestTable` SET fk1 = "fk1_str"
WHERE k1 = "p1str5" AND k2 = "p2str3";
)");

auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(UpsertMultipleUniqIndexes) {
Expand Down
Loading