Skip to content

Commit bf4a9ba

Browse files
authored
Merge 7d3ba05 into 62614f2
2 parents 62614f2 + 7d3ba05 commit bf4a9ba

File tree

8 files changed

+732
-648
lines changed

8 files changed

+732
-648
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 89 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,6 +2343,64 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
23432343
return true;
23442344
}
23452345

2346+
std::pair<TString, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) {
2347+
const TString& consumer = operation.GetConsumer();
2348+
2349+
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
2350+
PQ_LOG_D("Partition " << Partition <<
2351+
" Consumer '" << consumer << "' has been removed");
2352+
return {TStringBuilder() << "Consumer '" << consumer << "' has been removed", false};
2353+
}
2354+
2355+
if (!UsersInfoStorage->GetIfExists(consumer)) {
2356+
PQ_LOG_D("Partition " << Partition <<
2357+
" Unknown consumer '" << consumer << "'");
2358+
return {TStringBuilder() << "Unknown consumer '" << consumer << "'", false};
2359+
}
2360+
2361+
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
2362+
2363+
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2364+
PQ_LOG_D("Partition " << Partition <<
2365+
" Consumer '" << consumer << "'" <<
2366+
" Bad request (session already dead) " <<
2367+
" RequestSessionId '" << operation.GetReadSessionId() <<
2368+
"' CurrentSessionId '" << userInfo.Session <<
2369+
"'");
2370+
return {"Bad request (session already dead)", false};
2371+
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2372+
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
2373+
return {TStringBuilder() << "There are uncommitted messages in partition " << Partition.OriginalPartitionId, false};
2374+
} else {
2375+
return {"", false};
2376+
}
2377+
} else {
2378+
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
2379+
PQ_LOG_D("Partition " << Partition <<
2380+
" Consumer '" << consumer << "'" <<
2381+
" Bad request (invalid range) " <<
2382+
" Begin " << operation.GetCommitOffsetsBegin() <<
2383+
" End " << operation.GetCommitOffsetsEnd());
2384+
return {"Bad request (invalid range)", true};
2385+
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2386+
PQ_LOG_D("Partition " << Partition <<
2387+
" Consumer '" << consumer << "'" <<
2388+
" Bad request (gap) " <<
2389+
" Offset " << userInfo.Offset <<
2390+
" Begin " << operation.GetCommitOffsetsBegin());
2391+
return {"Bad request (gap)", true};
2392+
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
2393+
PQ_LOG_D("Partition " << Partition <<
2394+
" Consumer '" << consumer << "'" <<
2395+
" Bad request (behind the last offset) " <<
2396+
" EndOffset " << EndOffset <<
2397+
" End " << operation.GetCommitOffsetsEnd());
2398+
return {"Bad request (behind the last offset", true};
2399+
}
2400+
return {"", true};
2401+
}
2402+
}
2403+
23462404
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
23472405
{
23482406
if (tx.ForcePredicateFalse) {
@@ -2361,68 +2419,25 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23612419
return EProcessResult::Blocked;
23622420
}
23632421

2364-
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
2365-
PQ_LOG_D("Partition " << Partition <<
2366-
" Consumer '" << consumer << "' has been removed");
2367-
result = false;
2368-
break;
2369-
}
2370-
2371-
if (!UsersInfoStorage->GetIfExists(consumer)) {
2372-
PQ_LOG_D("Partition " << Partition <<
2373-
" Unknown consumer '" << consumer << "'");
2374-
result = false;
2375-
break;
2376-
}
2377-
2378-
bool isAffectedConsumer = AffectedUsers.contains(consumer);
2379-
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
2380-
2381-
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2382-
PQ_LOG_D("Partition " << Partition <<
2383-
" Consumer '" << consumer << "'" <<
2384-
" Bad request (session already dead) " <<
2385-
" RequestSessionId '" << operation.GetReadSessionId() <<
2386-
" CurrentSessionId '" << userInfo.Session <<
2387-
"'");
2388-
result = false;
2389-
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2390-
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
2391-
result = false;
2392-
}
2393-
} else {
2394-
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
2395-
PQ_LOG_D("Partition " << Partition <<
2396-
" Consumer '" << consumer << "'" <<
2397-
" Bad request (invalid range) " <<
2398-
" Begin " << operation.GetCommitOffsetsBegin() <<
2399-
" End " << operation.GetCommitOffsetsEnd());
2400-
result = false;
2401-
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2402-
PQ_LOG_D("Partition " << Partition <<
2403-
" Consumer '" << consumer << "'" <<
2404-
" Bad request (gap) " <<
2405-
" Offset " << userInfo.Offset <<
2406-
" Begin " << operation.GetCommitOffsetsBegin());
2407-
result = false;
2408-
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
2409-
PQ_LOG_D("Partition " << Partition <<
2410-
" Consumer '" << consumer << "'" <<
2411-
" Bad request (behind the last offset) " <<
2412-
" EndOffset " << EndOffset <<
2413-
" End " << operation.GetCommitOffsetsEnd());
2414-
result = false;
2415-
}
2422+
auto [error, real] = ValidatePartitionOperation(operation);
2423+
result = error.empty();
24162424

2425+
if (real) {
24172426
if (!result) {
2427+
bool isAffectedConsumer = AffectedUsers.contains(consumer);
2428+
24182429
if (!isAffectedConsumer) {
24192430
AffectedUsers.erase(consumer);
24202431
}
24212432
break;
24222433
}
24232434
consumers.insert(consumer);
24242435
}
2436+
if (!result) {
2437+
break;
2438+
}
24252439
}
2440+
24262441
if (result) {
24272442
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
24282443
}
@@ -2913,7 +2928,19 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29132928
"incorrect offset range (begin > end)");
29142929
return EProcessResult::ContinueDrop;
29152930
}
2916-
consumers.insert(user);
2931+
2932+
auto [error, real] = ValidatePartitionOperation(operation);
2933+
if (!error.empty()) {
2934+
ScheduleReplyPropose(tx,
2935+
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
2936+
NKikimrPQ::TError::BAD_REQUEST,
2937+
error);
2938+
return EProcessResult::ContinueDrop;
2939+
}
2940+
2941+
if (real) {
2942+
consumers.insert(user);
2943+
}
29172944
}
29182945
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
29192946
WriteKeysSizeEstimate += consumers.size();
@@ -2937,6 +2964,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29372964
return;
29382965
}
29392966
for (const auto& operation : record.GetData().GetOperations()) {
2967+
if (operation.GetOnlyCheckCommitedToFinish()) {
2968+
continue;
2969+
}
2970+
29402971
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
29412972
continue; //Write operation - handled separately via WriteInfo
29422973
}
@@ -2977,6 +3008,11 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29773008
"incorrect offset range (commit to the future)");
29783009
return;
29793010
}
3011+
3012+
if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
3013+
continue; // this is stale request, answer ok for it
3014+
}
3015+
29803016
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
29813017
}
29823018
CommitWriteOperations(t);

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
276+
std::pair<TString, bool> ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation);
277277
void UpdateUserInfoEndOffset(const TInstant& now);
278278
void UpdateWriteBufferIsFullState(const TInstant& now);
279279

0 commit comments

Comments
 (0)