Skip to content

Commit 5299845

Browse files
authored
Do not clear shard lists (#13189)
1 parent 2876ba4 commit 5299845

File tree

2 files changed

+18
-22
lines changed

2 files changed

+18
-22
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class TCommitOperation {
295295
using TPtr = std::shared_ptr<TCommitOperation>;
296296

297297
bool NeedSyncLocks() const {
298-
return SendingShards.size() && ReceivingShards.size();
298+
return SendingShards.size() || ReceivingShards.size();
299299
}
300300

301301
bool IsPrimary() const {
@@ -308,29 +308,28 @@ class TCommitOperation {
308308
}
309309

310310
TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
311-
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1);
312-
auto& locks = evWrite.Record.GetLocks();
313-
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
311+
TxId = evWrite.Record.GetTxId();
312+
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tx_id", TxId);
313+
const auto& locks = evWrite.Record.GetLocks();
314+
AFL_VERIFY(!locks.GetLocks().empty());
315+
auto& lock = locks.GetLocks()[0];
316+
LockId = lock.GetLockId();
314317
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
315318
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
316-
if (!ReceivingShards.size() || !SendingShards.size()) {
317-
ReceivingShards.clear();
318-
SendingShards.clear();
319-
} else if (!locks.HasArbiterColumnShard()) {
320-
ArbiterColumnShard = *ReceivingShards.begin();
319+
const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty();
320+
if (!singleShardTx) {
321321
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
322-
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
322+
return TConclusionStatus::Fail("shard is absent in sending and receiving lists");
323323
}
324-
} else {
325-
ArbiterColumnShard = locks.GetArbiterColumnShard();
326-
AFL_VERIFY(ArbiterColumnShard);
327-
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
328-
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
324+
if (locks.HasArbiterColumnShard()) {
325+
ArbiterColumnShard = locks.GetArbiterColumnShard();
326+
} else {
327+
AFL_VERIFY(!ReceivingShards.empty());
328+
ArbiterColumnShard = *ReceivingShards.begin();
329329
}
330+
AFL_VERIFY(ArbiterColumnShard);
330331
}
331332

332-
TxId = evWrite.Record.GetTxId();
333-
LockId = lock.GetLockId();
334333
Generation = lock.GetGeneration();
335334
InternalGenerationCounter = lock.GetCounter();
336335
if (!GetLockId()) {
@@ -339,15 +338,14 @@ class TCommitOperation {
339338
if (!TxId) {
340339
return TConclusionStatus::Fail("not initialized TxId for commit event");
341340
}
342-
if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
341+
if (locks.GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
343342
return TConclusionStatus::Fail("incorrect message type");
344343
}
345344
return TConclusionStatus::Success();
346345
}
347346

348347
std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
349348
const NKikimrTxColumnShard::ETransactionKind kind) const {
350-
AFL_VERIFY(ReceivingShards.size());
351349
if (IsPrimary()) {
352350
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
353351
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
6363
for (auto&& i : protoData.GetWaitShardsResultAck()) {
6464
WaitShardsResultAck.emplace(i);
6565
}
66-
AFL_VERIFY(ReceivingShards.empty() == SendingShards.empty());
6766
if (protoData.HasTxBroken()) {
6867
TxBroken = protoData.GetTxBroken();
6968
}
@@ -160,8 +159,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
160159
};
161160

162161
virtual bool IsTxBroken() const override {
163-
AFL_VERIFY(TxBroken);
164-
return *TxBroken;
162+
return TxBroken.value_or(false);
165163
}
166164

167165
void InitializeRequests(TColumnShard& owner) {

0 commit comments

Comments
 (0)