Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
auto& shardInfo = ShardsInfo.at(shardId);
if (auto lockPtr = shardInfo.Locks.FindPtr(lock.GetKey()); lockPtr) {
if (lock.Proto.GetHasWrites()) {
AFL_ENSURE(!ReadOnly);
lockPtr->Lock.Proto.SetHasWrites(true);
}

Expand Down Expand Up @@ -186,7 +187,12 @@ class TKqpTransactionManager : public IKqpTransactionManager {
}

bool IsVolatile() const override {
return !HasOlapTable();
return !HasOlapTable()
&& !IsReadOnly()
&& !IsSingleShard();

// TODO: && !HasPersistentChannels;
// Note: currently persistent channels are never used
}

bool HasSnapshot() const override {
Expand All @@ -213,10 +219,15 @@ class TKqpTransactionManager : public IKqpTransactionManager {
return ShardsIds.size();
}

bool NeedCommit() const override {
const bool dontNeedCommit = IsReadOnly() && (IsSingleShard() || HasSnapshot());
return !dontNeedCommit;
}

void StartPrepare() override {
AFL_ENSURE(!CollectOnly);
AFL_ENSURE(State == ETransactionState::COLLECTING);
AFL_ENSURE(!IsReadOnly());
AFL_ENSURE(NeedCommit());

THashSet<ui64> sendingColumnShardsSet;
THashSet<ui64> receivingColumnShardsSet;
Expand All @@ -242,8 +253,6 @@ class TKqpTransactionManager : public IKqpTransactionManager {
shardInfo.State = EShardState::PREPARING;
}

Y_ABORT_UNLESS(!ReceivingShards.empty());

constexpr size_t minArbiterMeshSize = 5;
if ((IsVolatile() &&
ReceivingShards.size() >= minArbiterMeshSize))
Expand Down Expand Up @@ -286,7 +295,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {

TPrepareInfo GetPrepareTransactionInfo() override {
AFL_ENSURE(State == ETransactionState::PREPARING);
AFL_ENSURE(!ReceivingShards.empty());
AFL_ENSURE(!ReceivingShards.empty() || !SendingShards.empty());

TPrepareInfo result {
.SendingShards = SendingShards,
Expand Down Expand Up @@ -323,7 +332,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
AFL_ENSURE(State == ETransactionState::PREPARING
|| (State == ETransactionState::COLLECTING
&& IsSingleShard()));
AFL_ENSURE(!IsReadOnly());
AFL_ENSURE(NeedCommit());
State = ETransactionState::EXECUTING;

for (auto& [_, shardInfo] : ShardsInfo) {
Expand Down Expand Up @@ -360,9 +369,24 @@ class TKqpTransactionManager : public IKqpTransactionManager {
AFL_ENSURE(shardInfo.State == EShardState::EXECUTING);
shardInfo.State = EShardState::FINISHED;

// Either all shards committed or all shards failed,
// so we need to wait only for one answer from ReceivingShards.
return ReceivingShards.contains(shardId) || IsSingleShard();
if (IsSingleShard() || ReceivingShards.contains(shardId)) {
// Either all shards committed write or all shards failed,
// so we need to wait only for one answer from ReceivingShards.
return true;
} else if (IsReadOnly() && !HasSnapshot()) {
AFL_ENSURE(ReceivingShards.empty());
// NOTE: In this case we have a possible RW transaction, that didn't write anything.
// For example, statement 'update dst set ... where ...' or 'insert into dst select from src where ...'.
// So, it's ok to use distributed commit in this case,
// because in general case (possible RW tx is RW tx) tx will be executed faster
// due to absence of taking snapshot (up to 10ms).

// In case of read only multishard tx without snapshot,
// we need to wait for all shards answers (to check locks).
AFL_ENSURE(SendingShards.erase(shardId) == 1);
return SendingShards.empty();
}
return false;
}

private:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/kqp_tx_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class IKqpTransactionManager {
virtual const THashSet<ui64>& GetShards() const = 0;
virtual ui64 GetShardsCount() const = 0;

virtual bool NeedCommit() const = 0;

virtual void StartPrepare() = 0;

struct TPrepareInfo {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
for (auto& [_, info] : WriteInfos) {
info.WriteTableActor->FlushBuffers();
}
if (TxManager->IsReadOnly()) {

if (!TxManager->NeedCommit()) {
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4309,6 +4309,14 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
// Empty replace
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/DataShard2` SELECT * FROM `/Root/DataShard` WHERE Col2 == 'not exists';
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto it = client.StreamExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard2`;
Expand Down
Loading