Skip to content

Commit c2436ed

Browse files
authored
ColumnShard + DataShard reads (#6800)
1 parent 5ff39ba commit c2436ed

File tree

4 files changed

+106
-24
lines changed

4 files changed

+106
-24
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
166166
for (const auto &input : stage.GetInputs()) {
167167
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
168168
}
169+
170+
for (const auto &tableOp : stage.GetTableOps()) {
171+
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
172+
// always need snapshot for OLAP reads
173+
return true;
174+
}
175+
}
169176
}
170177
}
171178

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,6 @@ class TKqpQueryState : public TNonCopyable {
312312

313313
bool NeedPersistentSnapshot() const {
314314
auto type = GetType();
315-
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
316-
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
317-
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
318-
}
319315
return (
320316
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
321317
type == NKikimrKqp::QUERY_TYPE_AST_SCAN

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -837,9 +837,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
837837
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
838838
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
839839
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
840-
if (HasOlapTable && HasOltpTable) {
840+
HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
841+
if (HasOlapTable && HasOltpTable && HasTableWrite) {
841842
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
842-
"Transactions between column and row tables are disabled at current time.");
843+
"Write transactions between column and row tables are disabled at current time.");
843844
return false;
844845
}
845846
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
@@ -2536,6 +2537,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
25362537

25372538
bool HasOlapTable = false;
25382539
bool HasOltpTable = false;
2540+
bool HasTableWrite = false;
25392541

25402542
TGUCSettings::TPtr GUCSettings;
25412543
};

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
27902790
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
27912791
UNIT_ASSERT(!insertResult.IsSuccess());
27922792
UNIT_ASSERT_C(
2793-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2793+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
27942794
insertResult.GetIssues().ToString());
27952795
}
27962796

@@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28032803
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28042804
UNIT_ASSERT(!insertResult.IsSuccess());
28052805
UNIT_ASSERT_C(
2806-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2807-
insertResult.GetIssues().ToString());
2808-
}
2809-
2810-
{
2811-
// column & row read
2812-
const TString sql = R"(
2813-
SELECT * FROM `/Root/DataShard`;
2814-
SELECT * FROM `/Root/ColumnShard`;
2815-
)";
2816-
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
2817-
UNIT_ASSERT(!insertResult.IsSuccess());
2818-
UNIT_ASSERT_C(
2819-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2806+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28202807
insertResult.GetIssues().ToString());
28212808
}
28222809

@@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28312818
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28322819
UNIT_ASSERT(!insertResult.IsSuccess());
28332820
UNIT_ASSERT_C(
2834-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2821+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28352822
insertResult.GetIssues().ToString());
28362823
}
28372824

@@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28452832
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28462833
UNIT_ASSERT(!insertResult.IsSuccess());
28472834
UNIT_ASSERT_C(
2848-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2835+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28492836
insertResult.GetIssues().ToString());
28502837
}
28512838

@@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
28592846
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
28602847
UNIT_ASSERT(!insertResult.IsSuccess());
28612848
UNIT_ASSERT_C(
2862-
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
2849+
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
28632850
insertResult.GetIssues().ToString());
28642851
}
28652852
}
@@ -3533,6 +3520,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
35333520
}
35343521
}
35353522

3523+
Y_UNIT_TEST(ReadDatashardAndColumnshard) {
3524+
NKikimrConfig::TAppConfig appConfig;
3525+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3526+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
3527+
auto settings = TKikimrSettings()
3528+
.SetAppConfig(appConfig)
3529+
.SetWithSampleTables(false);
3530+
3531+
TKikimrRunner kikimr(settings);
3532+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
3533+
3534+
auto client = kikimr.GetQueryClient();
3535+
3536+
{
3537+
auto createTable = client.ExecuteQuery(R"sql(
3538+
CREATE TABLE `/Root/DataShard` (
3539+
Col1 Uint64 NOT NULL,
3540+
Col2 Int32,
3541+
Col3 String,
3542+
PRIMARY KEY (Col1)
3543+
) WITH (
3544+
STORE = ROW,
3545+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3546+
);
3547+
CREATE TABLE `/Root/ColumnShard` (
3548+
Col1 Uint64 NOT NULL,
3549+
Col2 Int32,
3550+
Col3 String,
3551+
PRIMARY KEY (Col1)
3552+
) WITH (
3553+
STORE = COLUMN,
3554+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3555+
);
3556+
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
3557+
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
3558+
}
3559+
3560+
{
3561+
auto replaceValues = client.ExecuteQuery(R"sql(
3562+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
3563+
(1u, 1, "row");
3564+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3565+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3566+
}
3567+
3568+
{
3569+
auto replaceValues = client.ExecuteQuery(R"sql(
3570+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
3571+
(2u, 2, "column");
3572+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3573+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3574+
}
3575+
3576+
{
3577+
auto it = client.StreamExecuteQuery(R"sql(
3578+
SELECT * FROM `/Root/ColumnShard`;
3579+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3580+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3581+
TString output = StreamResultToYson(it);
3582+
CompareYson(
3583+
output,
3584+
R"([[2u;[2];["column"]]])");
3585+
}
3586+
3587+
{
3588+
auto it = client.StreamExecuteQuery(R"sql(
3589+
SELECT * FROM `/Root/DataShard`
3590+
UNION ALL
3591+
SELECT * FROM `/Root/ColumnShard`;
3592+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3593+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3594+
TString output = StreamResultToYson(it);
3595+
CompareYson(
3596+
output,
3597+
R"([[1u;[1];["row"]];[2u;[2];["column"]]])");
3598+
}
3599+
3600+
{
3601+
auto it = client.StreamExecuteQuery(R"sql(
3602+
SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r
3603+
JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1;
3604+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3605+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3606+
TString output = StreamResultToYson(it);
3607+
CompareYson(
3608+
output,
3609+
R"([[["row"];["column"]]])");
3610+
}
3611+
}
3612+
35363613
Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
35373614
NKikimrConfig::TAppConfig appConfig;
35383615
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);

0 commit comments

Comments
 (0)