Skip to content

ColumnShard + DataShard reads #6800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
for (const auto &input : stage.GetInputs()) {
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
}

for (const auto &tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
// always need snapshot for OLAP reads
return true;
}
}
}
}

Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ class TKqpQueryState : public TNonCopyable {

bool NeedPersistentSnapshot() const {
auto type = GetType();
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
}
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (HasOlapTable && HasOltpTable) {
HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (HasOlapTable && HasOltpTable && HasTableWrite) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Transactions between column and row tables are disabled at current time.");
"Write transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
Expand Down Expand Up @@ -2534,6 +2535,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TGUCSettings::TPtr GUCSettings;
};
Expand Down
113 changes: 95 additions & 18 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

{
// column & row read
const TString sql = R"(
SELECT * FROM `/Root/DataShard`;
SELECT * FROM `/Root/ColumnShard`;
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
}
Expand Down Expand Up @@ -3534,6 +3521,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(ReadDatashardAndColumnshard) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto client = kikimr.GetQueryClient();

{
auto createTable = client.ExecuteQuery(R"sql(
CREATE TABLE `/Root/DataShard` (
Col1 Uint64 NOT NULL,
Col2 Int32,
Col3 String,
PRIMARY KEY (Col1)
) WITH (
STORE = ROW,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);
CREATE TABLE `/Root/ColumnShard` (
Col1 Uint64 NOT NULL,
Col2 Int32,
Col3 String,
PRIMARY KEY (Col1)
) WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
}

{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
(1u, 1, "row");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}

{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
(2u, 2, "column");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT * FROM `/Root/ColumnShard`;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[2u;[2];["column"]]])");
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT * FROM `/Root/DataShard`
UNION ALL
SELECT * FROM `/Root/ColumnShard`;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[1u;[1];["row"]];[2u;[2];["column"]]])");
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r
JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[["row"];["column"]]])");
}
}

Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);
Expand Down
Loading