Skip to content

Commit 2530f5d

Browse files
authored
Fixed errors of the distributed commit offset to the partition (#17423)
1 parent 9a6920b commit 2530f5d

File tree

11 files changed

+709
-612
lines changed

11 files changed

+709
-612
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2378,18 +2378,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23782378
bool isAffectedConsumer = AffectedUsers.contains(consumer);
23792379
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
23802380

2381-
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2382-
PQ_LOG_D("Partition " << Partition <<
2383-
" Consumer '" << consumer << "'" <<
2384-
" Bad request (session already dead) " <<
2385-
" RequestSessionId '" << operation.GetReadSessionId() <<
2386-
" CurrentSessionId '" << userInfo.Session <<
2387-
"'");
2388-
result = false;
2389-
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2381+
if (operation.GetOnlyCheckCommitedToFinish()) {
23902382
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
23912383
result = false;
23922384
}
2385+
} else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2386+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) {
2387+
PQ_LOG_D("Partition " << Partition <<
2388+
" Consumer '" << consumer << "'" <<
2389+
" Bad request (session already dead) " <<
2390+
" RequestSessionId '" << operation.GetReadSessionId() <<
2391+
" CurrentSessionId '" << userInfo.Session <<
2392+
"'");
2393+
result = false;
2394+
}
23932395
} else {
23942396
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
23952397
PQ_LOG_D("Partition " << Partition <<
@@ -2423,6 +2425,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
24232425
consumers.insert(consumer);
24242426
}
24252427
}
2428+
24262429
if (result) {
24272430
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
24282431
}
@@ -2913,6 +2916,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29132916
"incorrect offset range (begin > end)");
29142917
return EProcessResult::ContinueDrop;
29152918
}
2919+
29162920
consumers.insert(user);
29172921
}
29182922
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
@@ -2937,6 +2941,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29372941
return;
29382942
}
29392943
for (const auto& operation : record.GetData().GetOperations()) {
2944+
if (operation.GetOnlyCheckCommitedToFinish()) {
2945+
continue;
2946+
}
2947+
29402948
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
29412949
continue; //Write operation - handled separately via WriteInfo
29422950
}
@@ -2977,6 +2985,21 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29772985
"incorrect offset range (commit to the future)");
29782986
return;
29792987
}
2988+
2989+
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) {
2990+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) {
2991+
ScheduleReplyPropose(record,
2992+
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
2993+
NKikimrPQ::TError::BAD_REQUEST,
2994+
"session already dead");
2995+
return;
2996+
}
2997+
}
2998+
2999+
if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
3000+
continue; // this is stale request, answer ok for it
3001+
}
3002+
29803003
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
29813004
}
29823005
CommitWriteOperations(t);

ydb/core/persqueue/partition.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
277276
void UpdateUserInfoEndOffset(const TInstant& now);
278277
void UpdateWriteBufferIsFullState(const TInstant& now);
279278

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222
#include "make_config.h"
2323

24+
template<>
25+
void Out<NKikimrPQ::TEvProposeTransactionResult_EStatus>(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) {
26+
out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v);
27+
}
28+
2429
namespace NKikimr::NPQ {
2530

2631
namespace NHelpers {
@@ -1007,7 +1012,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction
10071012

10081013
if (matcher.Status) {
10091014
UNIT_ASSERT(event->Record.HasStatus());
1010-
UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
1015+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
10111016
}
10121017
}
10131018

0 commit comments

Comments
 (0)