Skip to content

Commit ef17336

Browse files
committed
no-sink
1 parent 9c6fe40 commit ef17336

File tree

2 files changed

+28
-13
lines changed

2 files changed

+28
-13
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,7 +2036,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20362036
TDatashardTxs datashardTxs;
20372037
TEvWriteTxs evWriteTxs;
20382038
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
2039-
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());
20402039

20412040
// Single-shard datashard transactions are always immediate
20422041
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
@@ -2347,7 +2346,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23472346
// Volatile transactions must always use generic readsets
23482347
VolatileTx ||
23492348
// Transactions with topics must always use generic readsets
2350-
!topicTxs.empty());
2349+
!topicTxs.empty() ||
2350+
// HTAP transactions always use generic readsets
2351+
!evWriteTxs.empty());
23512352

23522353
if (!locksMap.empty() || VolatileTx ||
23532354
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
@@ -2469,12 +2470,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24692470
std::sort(receivingShards.begin(), receivingShards.end());
24702471

24712472
for (auto& [shardId, shardTx] : datashardTxs) {
2472-
AFL_ENSURE(!columnShardArbiter);
24732473
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
2474-
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
2475-
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
2476-
if (arbiter) {
2477-
shardTx->MutableLocks()->SetArbiterShard(arbiter);
2474+
if (columnShardArbiter) {
2475+
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
2476+
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
2477+
if (sendingShardsSet.contains(shardId)) {
2478+
shardTx->MutableLocks()->AddSendingShards(shardId);
2479+
}
2480+
if (receivingShardsSet.contains(shardId)) {
2481+
shardTx->MutableLocks()->AddReceivingShards(shardId);
2482+
}
2483+
AFL_ENSURE(!arbiter);
2484+
} else {
2485+
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
2486+
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
2487+
if (arbiter) {
2488+
shardTx->MutableLocks()->SetArbiterShard(arbiter);
2489+
}
24782490
}
24792491
}
24802492

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3037,10 +3037,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30373037
}
30383038
}
30393039

3040-
Y_UNIT_TEST(TableSink_Htap) {
3040+
Y_UNIT_TEST_TWIN(TableSink_Htap, withOltpSink) {
30413041
NKikimrConfig::TAppConfig appConfig;
30423042
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3043-
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
3043+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
30443044
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
30453045
auto settings = TKikimrSettings()
30463046
.SetAppConfig(appConfig)
@@ -3073,18 +3073,21 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30733073
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
30743074

30753075
auto client = kikimr.GetQueryClient();
3076+
30763077
{
3077-
auto prepareResult = client.ExecuteQuery(R"(
3078+
auto result = client.ExecuteQuery(R"(
30783079
UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
30793080
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
30803081
UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
30813082
(10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13);
30823083
INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`;
30833084
REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`;
3084-
SELECT * FROM `/Root/ColumnShard`;
3085-
SELECT * FROM `/Root/DataShard`;
3085+
SELECT * FROM `/Root/ColumnShard` ORDER BY Col1;
3086+
SELECT * FROM `/Root/DataShard` ORDER BY Col1;
30863087
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3087-
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
3088+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3089+
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[10u;["test1"];10];[20u;["test2"];11];[30u;["test3"];12];[40u;#;13]])", FormatResultSetYson(result.GetResultSet(0)));
3090+
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[10u;["test1"];10];[20u;["test2"];11];[30u;["test3"];12];[40u;#;13]])", FormatResultSetYson(result.GetResultSet(1)));
30883091
}
30893092
}
30903093

0 commit comments

Comments
 (0)