Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,17 @@ class TKqpTransactionManager : public IKqpTransactionManager {
for (auto& [shardId, shardInfo] : ShardsInfo) {
if ((shardInfo.Flags & EAction::WRITE)) {
ReceivingShards.insert(shardId);
if (shardInfo.IsOlap) {
receivingColumnShardsSet.insert(shardId);
}
if (IsVolatile()) {
SendingShards.insert(shardId);
}
if (shardInfo.IsOlap) {
sendingColumnShardsSet.insert(shardId);
}
}
if (!shardInfo.Locks.empty()) {
SendingShards.insert(shardId);
if (shardInfo.IsOlap) {
receivingColumnShardsSet.insert(shardId);
sendingColumnShardsSet.insert(shardId);
}
}

Expand Down Expand Up @@ -325,6 +325,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
ArbiterColumnShard = *arbiterIterator;
ReceivingShards.insert(*ArbiterColumnShard);
}

ShardsToWaitPrepare = ShardsIds;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2561,6 +2561,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
columnShardArbiter = *arbiterIterator;
receivingShardsSet.insert(*columnShardArbiter);
}
}

Expand Down
28 changes: 19 additions & 9 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,18 +315,28 @@ class TCommitOperation {
LockId = lock.GetLockId();
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty();
if (!singleShardTx) {
if (SendingShards.empty() != ReceivingShards.empty()) {
return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)");
}
if (ReceivingShards.size() && SendingShards.size()) {
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is absent in sending and receiving lists");
return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists");
}
if (locks.HasArbiterColumnShard()) {
ArbiterColumnShard = locks.GetArbiterColumnShard();
} else {
AFL_VERIFY(!ReceivingShards.empty());
ArbiterColumnShard = *ReceivingShards.begin();
if (!locks.HasArbiterColumnShard()) {
return TConclusionStatus::Fail("no arbiter info in request");
}
ArbiterColumnShard = locks.GetArbiterColumnShard();

if (IsPrimary() && !ReceivingShards.contains(ArbiterColumnShard)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
"receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
return TConclusionStatus::Fail("arbiter is absent in receiving lists");
}
if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
"receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists");
}
AFL_VERIFY(ArbiterColumnShard);
}

Generation = lock.GetGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
};

virtual bool IsTxBroken() const override {
return TxBroken.value_or(false);
AFL_VERIFY(TxBroken);
return *TxBroken;
}

void InitializeRequests(TColumnShard& owner) {
Expand Down
Loading