Skip to content

Fixed errors of the distributed commit offset to the partition #17423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 21, 2025
41 changes: 32 additions & 9 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2378,18 +2378,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
bool isAffectedConsumer = AffectedUsers.contains(consumer);
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);

if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (session already dead) " <<
" RequestSessionId '" << operation.GetReadSessionId() <<
" CurrentSessionId '" << userInfo.Session <<
"'");
result = false;
} else if (operation.GetOnlyCheckCommitedToFinish()) {
if (operation.GetOnlyCheckCommitedToFinish()) {
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
result = false;
}
} else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (session already dead) " <<
" RequestSessionId '" << operation.GetReadSessionId() <<
" CurrentSessionId '" << userInfo.Session <<
"'");
result = false;
}
} else {
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
PQ_LOG_D("Partition " << Partition <<
Expand Down Expand Up @@ -2423,6 +2425,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
consumers.insert(consumer);
}
}

if (result) {
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
}
Expand Down Expand Up @@ -2913,6 +2916,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
"incorrect offset range (begin > end)");
return EProcessResult::ContinueDrop;
}

consumers.insert(user);
}
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
Expand All @@ -2937,6 +2941,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
return;
}
for (const auto& operation : record.GetData().GetOperations()) {
if (operation.GetOnlyCheckCommitedToFinish()) {
continue;
}

if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
continue; //Write operation - handled separately via WriteInfo
}
Expand Down Expand Up @@ -2977,6 +2985,21 @@ void TPartition::ExecImmediateTx(TTransaction& t)
"incorrect offset range (commit to the future)");
return;
}

if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) {
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) {
ScheduleReplyPropose(record,
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
NKikimrPQ::TError::BAD_REQUEST,
"session already dead");
return;
}
}

if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
continue; // this is stale request, answer ok for it
}

pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
}
CommitWriteOperations(t);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
void ConsumeBlobQuota();
void UpdateAfterWriteCounters(bool writeComplete);


void UpdateUserInfoEndOffset(const TInstant& now);
void UpdateWriteBufferIsFullState(const TInstant& now);

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

#include "make_config.h"

template<>
void Out<NKikimrPQ::TEvProposeTransactionResult_EStatus>(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) {
out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v);
}

namespace NKikimr::NPQ {

namespace NHelpers {
Expand Down Expand Up @@ -1007,7 +1012,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction

if (matcher.Status) {
UNIT_ASSERT(event->Record.HasStatus());
UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
}
}

Expand Down
Loading
Loading