Skip to content

Commit ad49fc0

Browse files
Fixed errors with transactions in the PQ tablet (#12884) (#12901)
1 parent 68b4343 commit ad49fc0

File tree

9 files changed

+578
-150
lines changed

9 files changed

+578
-150
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ ui32 THead::GetCount() const
660660

661661
//how much offsets before last batch and how much offsets in last batch
662662
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset);
663+
663664
return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
664665
}
665666

@@ -933,16 +934,23 @@ auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
933934

934935
auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
935936
{
937+
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
938+
NeedCompactHead = false;
939+
}
940+
936941
std::optional<TFormedBlobInfo> res;
937942
if (NeedCompactHead) {
938943
NeedCompactHead = false;
939-
GlueNewHead = false;
940944
res = CreateFormedBlob(0, false);
945+
946+
StartOffset = NewHead.Offset + NewHead.GetCount();
947+
NewHead.Clear();
948+
NewHead.Offset = StartOffset;
941949
}
942950

943951
TKey newKey(TKeyPrefix::TypeData,
944952
Partition,
945-
NewHead.Offset + oldKey.GetOffset(),
953+
StartOffset,
946954
oldKey.GetPartNo(),
947955
oldKey.GetCount(),
948956
oldKey.GetInternalPartsCount(),

ydb/core/persqueue/events/internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ struct TEvPQ {
819819
ui64 TxId;
820820
TVector<NKikimrPQ::TPartitionOperation> Operations;
821821
TActorId SupportivePartitionActor;
822+
bool ForceFalse = false;
822823
};
823824

824825
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {

ydb/core/persqueue/partition.cpp

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ ui64 TPartition::UserDataSize() const {
251251
return 0;
252252
}
253253

254-
// We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is
254+
// We assume that DataKeysBody contains an up-to-date set of blobs, their relevance is
255255
// maintained by the background process. However, the last block may contain several irrelevant
256256
// messages. Because of them, we throw out the size of the entire blob.
257257
auto size = Size();
@@ -938,15 +938,13 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor
938938

939939
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
940940
{
941+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
942+
941943
PendingEvents.emplace_back(ev->ReleaseBase().Release());
942944
}
943945

944946
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
945947
{
946-
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit" <<
947-
" Step " << ev->Get()->Step <<
948-
", TxId " << ev->Get()->TxId);
949-
950948
PendingEvents.emplace_back(ev->ReleaseBase().Release());
951949
}
952950

@@ -957,9 +955,32 @@ void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContex
957955

958956
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
959957
{
960-
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig" <<
961-
" Step " << ev->Get()->Step <<
962-
", TxId " << ev->Get()->TxId);
958+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
959+
}
960+
961+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
962+
{
963+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoRequest");
964+
965+
Y_ABORT_UNLESS(IsSupportive());
966+
967+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
968+
}
969+
970+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
971+
{
972+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoResponse");
973+
974+
Y_ABORT_UNLESS(!IsSupportive());
975+
976+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
977+
}
978+
979+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
980+
{
981+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoError");
982+
983+
Y_ABORT_UNLESS(!IsSupportive());
963984

964985
PendingEvents.emplace_back(ev->ReleaseBase().Release());
965986
}
@@ -1058,7 +1079,9 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
10581079
}
10591080

10601081
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1082+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
10611083
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
1084+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
10621085
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
10631086
"Write info requested while writes are not complete");
10641087
ctx.Send(ev->Sender, response);
@@ -1084,6 +1107,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
10841107
response->MessagesSizes = std::move(MessageSize.GetValues());
10851108
response->InputLags = std::move(SupportivePartitionTimeLag);
10861109

1110+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse");
10871111
ctx.Send(ev->Sender, response);
10881112
}
10891113

@@ -1170,7 +1194,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
11701194

11711195

11721196
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
1173-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError");
1197+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
1198+
"Cookie " << ev->Get()->Cookie <<
1199+
", Message " << ev->Get()->Message);
11741200
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
11751201
}
11761202

@@ -1856,7 +1882,6 @@ void TPartition::ProcessCommitQueue() {
18561882
return this->ExecUserActionOrTransaction(event, request);
18571883
};
18581884
while (!UserActionAndTxPendingCommit.empty()) {
1859-
// UserActionAndTxPendingCommit.pop_front();
18601885
auto& front = UserActionAndTxPendingCommit.front();
18611886
auto state = ECommitState::Committed;
18621887
if (auto* tx = get_if<TSimpleSharedPtr<TTransaction>>(&front.Event)) {
@@ -2023,8 +2048,9 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
20232048
Y_ABORT_UNLESS(t->ChangeConfig);
20242049

20252050
Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig);
2026-
if (!FirstEvent)
2051+
if (!FirstEvent) {
20272052
return EProcessResult::Blocked;
2053+
}
20282054
ChangingConfig = true;
20292055
// Should remove this and add some id to TEvChangeConfig if we want to batch change of configs
20302056
t->State = ECommitState::Committed;
@@ -2180,6 +2206,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21802206
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
21812207

21822208
if (!t.WriteInfo->BodyKeys.empty()) {
2209+
bool needCompactHead =
2210+
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
2211+
21832212
PartitionedBlob = TPartitionedBlob(Partition,
21842213
NewHead.Offset,
21852214
"", // SourceId
@@ -2189,7 +2218,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21892218
Head,
21902219
NewHead,
21912220
Parameters->HeadCleared, // headCleared
2192-
Head.PackedSize != 0, // needCompactHead
2221+
needCompactHead, // needCompactHead
21932222
MaxBlobSize);
21942223

21952224
for (auto& k : t.WriteInfo->BodyKeys) {
@@ -2200,9 +2229,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22002229
CompactedKeys.emplace_back(write->Key, write->Value.size());
22012230
ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get());
22022231
}
2232+
Parameters->CurOffset += k.Key.GetCount();
22032233
}
22042234

2205-
22062235
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
22072236
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
22082237
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
@@ -2213,17 +2242,15 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22132242
ctx);
22142243
}
22152244

2216-
const auto& last = t.WriteInfo->BodyKeys.back();
2217-
2218-
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
2245+
NewHead.Clear();
2246+
NewHead.Offset = Parameters->CurOffset;
22192247
}
22202248

22212249
if (!t.WriteInfo->BlobsFromHead.empty()) {
22222250
auto& first = t.WriteInfo->BlobsFromHead.front();
22232251
NewHead.PartNo = first.GetPartNo();
22242252

2225-
Parameters->CurOffset = NewHead.Offset;
2226-
Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty();
2253+
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
22272254

22282255
PartitionedBlob = TPartitionedBlob(Partition,
22292256
NewHead.Offset,
@@ -2268,6 +2295,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22682295
}
22692296
}
22702297

2298+
Parameters->FirstCommitWriteOperations = false;
2299+
22712300
WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
22722301
}
22732302

@@ -2544,6 +2573,8 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
25442573

25452574
void TPartition::ResendPendingEvents(const TActorContext& ctx)
25462575
{
2576+
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2577+
25472578
while (!PendingEvents.empty()) {
25482579
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
25492580
PendingEvents.pop_front();
@@ -3416,8 +3447,6 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
34163447

34173448
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
34183449
{
3419-
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3420-
34213450
Y_ABORT_UNLESS(IsSupportive());
34223451

34233452
PendingEvents.emplace_back(ev->ReleaseBase().Release());

ydb/core/persqueue/partition.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
416416
void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
417417
void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
418418
void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
419+
void HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx);
420+
void HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx);
421+
void HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx);
419422

420423
void ChangePlanStepAndTxId(ui64 step, ui64 txId);
421424

@@ -523,10 +526,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
523526
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
524527
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
525528
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
526-
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
529+
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, HandleOnInit);
527530

528-
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
529-
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
531+
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, HandleOnInit);
532+
HFuncTraced(TEvPQ::TEvGetWriteInfoError, HandleOnInit);
530533
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
531534
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
532535
default:
@@ -618,6 +621,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
618621
ui64 CurOffset;
619622
bool OldPartsCleared;
620623
bool HeadCleared;
624+
bool FirstCommitWriteOperations = true;
621625
};
622626

623627
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
460460

461461
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
462462
{
463-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
463+
PQ_LOG_T("TPartition::Handle TEvHandleWriteResponse.");
464464
OnHandleWriteResponse(ctx);
465465
}
466466

0 commit comments

Comments
 (0)