Skip to content

Commit 76e77cf

Browse files
[*] if the conditions are violated, the transaction is terminated with the code ABORTED
1 parent e959a99 commit 76e77cf

File tree

2 files changed

+40
-13
lines changed

2 files changed

+40
-13
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ struct TEvPQ {
816816
ui64 TxId;
817817
TVector<NKikimrPQ::TPartitionOperation> Operations;
818818
TActorId SupportivePartitionActor;
819+
bool ForceFalse = false;
819820
};
820821

821822
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {

ydb/core/persqueue/pq_impl.cpp

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3906,10 +3906,22 @@ TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti
39063906
void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
39073907
TDistributedTransaction& tx)
39083908
{
3909+
auto OriginalPartitionExists = [this](ui32 partitionId) {
3910+
return Partitions.contains(TPartitionId(partitionId));
3911+
};
3912+
3913+
// if the predicate is violated, the transaction will end with the ABORTED code
3914+
bool forceFalse = false;
39093915
THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;
39103916

39113917
for (auto& operation : tx.Operations) {
39123918
ui32 originalPartitionId = operation.GetPartitionId();
3919+
3920+
if (!OriginalPartitionExists(originalPartitionId)) {
3921+
forceFalse = true;
3922+
continue;
3923+
}
3924+
39133925
auto& event = events[originalPartitionId];
39143926
if (!event) {
39153927
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
@@ -3924,29 +3936,43 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
39243936

39253937
if (tx.WriteId.Defined()) {
39263938
const TWriteId& writeId = *tx.WriteId;
3927-
Y_ABORT_UNLESS(TxWrites.contains(writeId),
3928-
"PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
3929-
TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId);
3930-
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
3939+
if (TxWrites.contains(writeId)) {
3940+
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
3941+
3942+
for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
3943+
if (!OriginalPartitionExists(originalPartitionId)) {
3944+
PQ_LOG_W("Unknown partition " << originalPartitionId << " for TxId " << tx.TxId);
3945+
forceFalse = true;
3946+
continue;
3947+
}
39313948

3932-
for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
3933-
Y_ABORT_UNLESS(Partitions.contains(partitionId));
3934-
const TPartitionInfo& partition = Partitions.at(partitionId);
3949+
auto& event = events[originalPartitionId];
3950+
if (!event) {
3951+
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
3952+
}
39353953

3936-
auto& event = events[originalPartitionId];
3937-
if (!event) {
3938-
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
3939-
}
3954+
if (!Partitions.contains(partitionId)) {
3955+
PQ_LOG_W("Unknown partition " << partitionId << " for TxId " << tx.TxId);
3956+
forceFalse = true;
3957+
continue;
3958+
}
3959+
3960+
const TPartitionInfo& partition = Partitions.at(partitionId);
39403961

3941-
event->SupportivePartitionActor = partition.Actor;
3962+
event->SupportivePartitionActor = partition.Actor;
3963+
}
3964+
} else {
3965+
PQ_LOG_W("Unknown WriteId " << writeId << " for TxId " << tx.TxId);
3966+
forceFalse = true;
39423967
}
39433968
}
39443969

39453970
for (auto& [originalPartitionId, event] : events) {
39463971
TPartitionId partitionId(originalPartitionId);
3947-
Y_ABORT_UNLESS(Partitions.contains(partitionId));
39483972
const TPartitionInfo& partition = Partitions.at(partitionId);
39493973

3974+
event->ForceFalse = forceFalse;
3975+
39503976
ctx.Send(partition.Actor, event.release());
39513977
}
39523978

0 commit comments

Comments
 (0)