Skip to content

Temporary disable mixed column/row tables transactions #1412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class TKqpQueryState : public TNonCopyable {
auto type = GetType();
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery());
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
}
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
if ((::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery))
&& (::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery))) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
Expand Down
51 changes: 50 additions & 1 deletion ydb/core/kqp/session_actor/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return readPhases > 1;
}

bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
Expand All @@ -191,5 +191,54 @@ bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
return true;
}
}
}
}
return false;
}

bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
switch (tableOp.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRange:
case NKqpProto::TKqpPhyTableOperation::kLookup:
case NKqpProto::TKqpPhyTableOperation::kReadRanges:
return true;
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
break;
default:
YQL_ENSURE(false, "unexpected type");
}
}
}
}
return false;
}

bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows
|| tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
return true;
}
}
}
}
return false;
}

} // namespace NKqp
} // namespace NKikimr
5 changes: 4 additions & 1 deletion ydb/core/kqp/session_actor/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp
bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx,
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);

} // namespace NKikimr::NKqp
84 changes: 50 additions & 34 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5514,6 +5514,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/ColumnSource` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);

CREATE TABLE `/Root/DataShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
Expand All @@ -5540,6 +5549,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);

CREATE TABLE `/Root/ColumnShard3` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);

CREATE TABLE `/Root/DataShard2` (
Col1 Uint64 NOT NULL,
Col2 String,
Expand All @@ -5553,34 +5571,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());

{
// row -> column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard1`
SELECT * FROM `/Root/DataShard1`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());

auto it = client.StreamExecuteQuery(R"(
SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3;
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColumnSource` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(output, R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
// Missing Nullable column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard1`
SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard1`
SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
Expand All @@ -5592,14 +5595,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
R"([[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
}

{
// column -> column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard2`
SELECT * FROM `/Root/ColumnShard1`
SELECT * FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
Expand All @@ -5611,26 +5614,39 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
}

{
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());

// row -> column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard3`
SELECT * FROM `/Root/DataShard1`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

{
// column -> row
const TString sql = R"(
REPLACE INTO `/Root/DataShard2`
SELECT * FROM `/Root/ColumnShard2`
SELECT * FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());

auto it = client.StreamExecuteQuery(R"(
SELECT * FROM `/Root/DataShard2` ORDER BY Col1, Col2, Col3;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
}

Expand Down