Skip to content

Commit 2045719

Browse files
authored
Merge 99ba5a8 into 73261ff
2 parents 73261ff + 99ba5a8 commit 2045719

File tree

2 files changed

+84
-4
lines changed

2 files changed

+84
-4
lines changed

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,17 +312,18 @@ class TKqpQueryState : public TNonCopyable {
312312

313313
bool NeedPersistentSnapshot() const {
314314
auto type = GetType();
315-
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
316-
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
317-
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
318-
}
319315
return (
320316
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
321317
type == NKikimrKqp::QUERY_TYPE_AST_SCAN
322318
);
323319
}
324320

325321
bool NeedSnapshot(const NYql::TKikimrConfiguration& config) const {
322+
auto type = GetType();
323+
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
324+
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
325+
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
326+
}
326327
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
327328
}
328329

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3533,6 +3533,85 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
35333533
}
35343534
}
35353535

3536+
Y_UNIT_TEST(ReadDatashardAndColumnshard) {
3537+
NKikimrConfig::TAppConfig appConfig;
3538+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);
3539+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(false);
3540+
auto settings = TKikimrSettings()
3541+
.SetAppConfig(appConfig)
3542+
.SetWithSampleTables(false);
3543+
3544+
TKikimrRunner kikimr(settings);
3545+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
3546+
3547+
// auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
3548+
auto client = kikimr.GetQueryClient();
3549+
3550+
{
3551+
auto createTable = client.ExecuteQuery(R"sql(
3552+
CREATE TABLE `/Root/DataShard` (
3553+
Col1 Uint64 NOT NULL,
3554+
Col2 Int32,
3555+
Col3 String,
3556+
PRIMARY KEY (Col1)
3557+
) WITH (
3558+
STORE = ROW,
3559+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3560+
);
3561+
CREATE TABLE `/Root/ColumnShard` (
3562+
Col1 Uint64 NOT NULL,
3563+
Col2 Int32,
3564+
Col3 String,
3565+
PRIMARY KEY (Col1)
3566+
) WITH (
3567+
STORE = COLUMN,
3568+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
3569+
);
3570+
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
3571+
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
3572+
}
3573+
3574+
{
3575+
auto replaceValues = client.ExecuteQuery(R"sql(
3576+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
3577+
(1u, "row", 1);
3578+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3579+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3580+
}
3581+
3582+
{
3583+
auto replaceValues = client.ExecuteQuery(R"sql(
3584+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
3585+
(2u, "column", 2);
3586+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3587+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
3588+
}
3589+
3590+
{
3591+
auto it = client.StreamExecuteQuery(R"sql(
3592+
SELECT * FROM `/Root/ColumnShard`;
3593+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3594+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3595+
TString output = StreamResultToYson(it);
3596+
CompareYson(
3597+
output,
3598+
R"([])");
3599+
}
3600+
3601+
{
3602+
auto it = client.StreamExecuteQuery(R"sql(
3603+
SELECT * FROM `/Root/DataShard`;
3604+
UNION
3605+
SELECT * FROM `/Root/ColumnShard`;
3606+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3607+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3608+
TString output = StreamResultToYson(it);
3609+
CompareYson(
3610+
output,
3611+
R"([])");
3612+
}
3613+
}
3614+
35363615
Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
35373616
NKikimrConfig::TAppConfig appConfig;
35383617
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);

0 commit comments

Comments
 (0)