Skip to content

Commit ed4a793

Browse files
authored
Merge 320f463 into d7df471
2 parents d7df471 + 320f463 commit ed4a793

File tree

4 files changed

+45
-10
lines changed

4 files changed

+45
-10
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
6565
auto& shardInfo = ShardsInfo.at(shardId);
6666
if (auto lockPtr = shardInfo.Locks.FindPtr(lock.GetKey()); lockPtr) {
6767
if (lock.Proto.GetHasWrites()) {
68+
AFL_ENSURE(!ReadOnly);
6869
lockPtr->Lock.Proto.SetHasWrites(true);
6970
}
7071

@@ -186,7 +187,12 @@ class TKqpTransactionManager : public IKqpTransactionManager {
186187
}
187188

188189
bool IsVolatile() const override {
189-
return !HasOlapTable();
190+
return !HasOlapTable()
191+
&& !IsReadOnly()
192+
&& !IsSingleShard();
193+
194+
// TODO: && !HasPersistentChannels;
195+
// Note: currently persistent channels are never used
190196
}
191197

192198
bool HasSnapshot() const override {
@@ -213,10 +219,15 @@ class TKqpTransactionManager : public IKqpTransactionManager {
213219
return ShardsIds.size();
214220
}
215221

222+
bool NeedCommit() const override {
223+
const bool dontNeedCommit = IsReadOnly() && (IsSingleShard() || HasSnapshot());
224+
return !dontNeedCommit;
225+
}
226+
216227
void StartPrepare() override {
217228
AFL_ENSURE(!CollectOnly);
218229
AFL_ENSURE(State == ETransactionState::COLLECTING);
219-
AFL_ENSURE(!IsReadOnly());
230+
AFL_ENSURE(NeedCommit());
220231

221232
THashSet<ui64> sendingColumnShardsSet;
222233
THashSet<ui64> receivingColumnShardsSet;
@@ -242,8 +253,6 @@ class TKqpTransactionManager : public IKqpTransactionManager {
242253
shardInfo.State = EShardState::PREPARING;
243254
}
244255

245-
Y_ABORT_UNLESS(!ReceivingShards.empty());
246-
247256
constexpr size_t minArbiterMeshSize = 5;
248257
if ((IsVolatile() &&
249258
ReceivingShards.size() >= minArbiterMeshSize))
@@ -286,7 +295,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
286295

287296
TPrepareInfo GetPrepareTransactionInfo() override {
288297
AFL_ENSURE(State == ETransactionState::PREPARING);
289-
AFL_ENSURE(!ReceivingShards.empty());
298+
AFL_ENSURE(!ReceivingShards.empty() || !SendingShards.empty());
290299

291300
TPrepareInfo result {
292301
.SendingShards = SendingShards,
@@ -323,7 +332,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
323332
AFL_ENSURE(State == ETransactionState::PREPARING
324333
|| (State == ETransactionState::COLLECTING
325334
&& IsSingleShard()));
326-
AFL_ENSURE(!IsReadOnly());
335+
AFL_ENSURE(NeedCommit());
327336
State = ETransactionState::EXECUTING;
328337

329338
for (auto& [_, shardInfo] : ShardsInfo) {
@@ -360,9 +369,24 @@ class TKqpTransactionManager : public IKqpTransactionManager {
360369
AFL_ENSURE(shardInfo.State == EShardState::EXECUTING);
361370
shardInfo.State = EShardState::FINISHED;
362371

363-
// Either all shards committed or all shards failed,
364-
// so we need to wait only for one answer from ReceivingShards.
365-
return ReceivingShards.contains(shardId) || IsSingleShard();
372+
if (IsSingleShard() || ReceivingShards.contains(shardId)) {
373+
// Either all shards committed write or all shards failed,
374+
// so we need to wait only for one answer from ReceivingShards.
375+
return true;
376+
} else if (IsReadOnly() && !HasSnapshot()) {
377+
AFL_ENSURE(ReceivingShards.empty());
378+
// NOTE: In this case we have a possible RW transaction, that didn't write anything.
379+
// For example, statement 'update dst set ... where ...' or 'insert into dst select from src where ...'.
380+
// So, it's ok to use distributed commit in this case,
381+
// because in general case (possible RW tx is RW tx) tx will be executed faster
382+
// due to absence of taking snapshot (up to 10ms).
383+
384+
// In case of read only multishard tx without snapshot,
385+
// we need to wait for all shards answers (to check locks).
386+
AFL_ENSURE(SendingShards.erase(shardId) == 1);
387+
return SendingShards.empty();
388+
}
389+
return false;
366390
}
367391

368392
private:

ydb/core/kqp/common/kqp_tx_manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class IKqpTransactionManager {
7070
virtual const THashSet<ui64>& GetShards() const = 0;
7171
virtual ui64 GetShardsCount() const = 0;
7272

73+
virtual bool NeedCommit() const = 0;
74+
7375
virtual void StartPrepare() = 0;
7476

7577
struct TPrepareInfo {

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1763,7 +1763,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17631763
for (auto& [_, info] : WriteInfos) {
17641764
info.WriteTableActor->FlushBuffers();
17651765
}
1766-
if (TxManager->IsReadOnly()) {
1766+
1767+
if (!TxManager->NeedCommit()) {
17671768
Rollback();
17681769
State = EState::FINISHED;
17691770
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4309,6 +4309,14 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
43094309
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
43104310
}
43114311

4312+
{
4313+
// Empty replace
4314+
auto prepareResult = client.ExecuteQuery(R"(
4315+
REPLACE INTO `/Root/DataShard2` SELECT * FROM `/Root/DataShard` WHERE Col2 == 'not exists';
4316+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
4317+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4318+
}
4319+
43124320
{
43134321
auto it = client.StreamExecuteQuery(R"(
43144322
SELECT COUNT(*) FROM `/Root/DataShard2`;

0 commit comments

Comments
 (0)