Skip to content

Commit f9516c4

Browse files
Fixed errors with transactions in the PQ tablet (#12884) (#12903)
1 parent 9ccadbf commit f9516c4

File tree

9 files changed

+578
-150
lines changed

9 files changed

+578
-150
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ ui32 THead::GetCount() const
660660

661661
//how much offsets before last batch and how much offsets in last batch
662662
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset);
663+
663664
return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
664665
}
665666

@@ -933,16 +934,23 @@ auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
933934

934935
auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
935936
{
937+
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
938+
NeedCompactHead = false;
939+
}
940+
936941
std::optional<TFormedBlobInfo> res;
937942
if (NeedCompactHead) {
938943
NeedCompactHead = false;
939-
GlueNewHead = false;
940944
res = CreateFormedBlob(0, false);
945+
946+
StartOffset = NewHead.Offset + NewHead.GetCount();
947+
NewHead.Clear();
948+
NewHead.Offset = StartOffset;
941949
}
942950

943951
TKey newKey(TKeyPrefix::TypeData,
944952
Partition,
945-
NewHead.Offset + oldKey.GetOffset(),
953+
StartOffset,
946954
oldKey.GetPartNo(),
947955
oldKey.GetCount(),
948956
oldKey.GetInternalPartsCount(),

ydb/core/persqueue/events/internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ struct TEvPQ {
819819
ui64 TxId;
820820
TVector<NKikimrPQ::TPartitionOperation> Operations;
821821
TActorId SupportivePartitionActor;
822+
bool ForceFalse = false;
822823
};
823824

824825
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {

ydb/core/persqueue/partition.cpp

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ ui64 TPartition::UserDataSize() const {
251251
return 0;
252252
}
253253

254-
// We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is
254+
// We assume that DataKeysBody contains an up-to-date set of blobs, their relevance is
255255
// maintained by the background process. However, the last block may contain several irrelevant
256256
// messages. Because of them, we throw out the size of the entire blob.
257257
auto size = Size();
@@ -955,15 +955,13 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor
955955

956956
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
957957
{
958+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
959+
958960
PendingEvents.emplace_back(ev->ReleaseBase().Release());
959961
}
960962

961963
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
962964
{
963-
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit" <<
964-
" Step " << ev->Get()->Step <<
965-
", TxId " << ev->Get()->TxId);
966-
967965
PendingEvents.emplace_back(ev->ReleaseBase().Release());
968966
}
969967

@@ -974,9 +972,32 @@ void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContex
974972

975973
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
976974
{
977-
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig" <<
978-
" Step " << ev->Get()->Step <<
979-
", TxId " << ev->Get()->TxId);
975+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
976+
}
977+
978+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
979+
{
980+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoRequest");
981+
982+
Y_ABORT_UNLESS(IsSupportive());
983+
984+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
985+
}
986+
987+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
988+
{
989+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoResponse");
990+
991+
Y_ABORT_UNLESS(!IsSupportive());
992+
993+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
994+
}
995+
996+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
997+
{
998+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoError");
999+
1000+
Y_ABORT_UNLESS(!IsSupportive());
9801001

9811002
PendingEvents.emplace_back(ev->ReleaseBase().Release());
9821003
}
@@ -1075,7 +1096,9 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
10751096
}
10761097

10771098
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1099+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
10781100
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
1101+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
10791102
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
10801103
"Write info requested while writes are not complete");
10811104
ctx.Send(ev->Sender, response);
@@ -1101,6 +1124,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11011124
response->MessagesSizes = std::move(MessageSize.GetValues());
11021125
response->InputLags = std::move(SupportivePartitionTimeLag);
11031126

1127+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse");
11041128
ctx.Send(ev->Sender, response);
11051129
}
11061130

@@ -1187,7 +1211,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
11871211

11881212

11891213
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
1190-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError");
1214+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
1215+
"Cookie " << ev->Get()->Cookie <<
1216+
", Message " << ev->Get()->Message);
11911217
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
11921218
}
11931219

@@ -1873,7 +1899,6 @@ void TPartition::ProcessCommitQueue() {
18731899
return this->ExecUserActionOrTransaction(event, request);
18741900
};
18751901
while (!UserActionAndTxPendingCommit.empty()) {
1876-
// UserActionAndTxPendingCommit.pop_front();
18771902
auto& front = UserActionAndTxPendingCommit.front();
18781903
auto state = ECommitState::Committed;
18791904
if (auto* tx = get_if<TSimpleSharedPtr<TTransaction>>(&front.Event)) {
@@ -2040,8 +2065,9 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
20402065
Y_ABORT_UNLESS(t->ChangeConfig);
20412066

20422067
Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig);
2043-
if (!FirstEvent)
2068+
if (!FirstEvent) {
20442069
return EProcessResult::Blocked;
2070+
}
20452071
ChangingConfig = true;
20462072
// Should remove this and add some id to TEvChangeConfig if we want to batch change of configs
20472073
t->State = ECommitState::Committed;
@@ -2197,6 +2223,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21972223
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
21982224

21992225
if (!t.WriteInfo->BodyKeys.empty()) {
2226+
bool needCompactHead =
2227+
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
2228+
22002229
PartitionedBlob = TPartitionedBlob(Partition,
22012230
NewHead.Offset,
22022231
"", // SourceId
@@ -2206,7 +2235,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22062235
Head,
22072236
NewHead,
22082237
Parameters->HeadCleared, // headCleared
2209-
Head.PackedSize != 0, // needCompactHead
2238+
needCompactHead, // needCompactHead
22102239
MaxBlobSize);
22112240

22122241
for (auto& k : t.WriteInfo->BodyKeys) {
@@ -2217,9 +2246,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22172246
CompactedKeys.emplace_back(write->Key, write->Value.size());
22182247
ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get());
22192248
}
2249+
Parameters->CurOffset += k.Key.GetCount();
22202250
}
22212251

2222-
22232252
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
22242253
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
22252254
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
@@ -2230,17 +2259,15 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22302259
ctx);
22312260
}
22322261

2233-
const auto& last = t.WriteInfo->BodyKeys.back();
2234-
2235-
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
2262+
NewHead.Clear();
2263+
NewHead.Offset = Parameters->CurOffset;
22362264
}
22372265

22382266
if (!t.WriteInfo->BlobsFromHead.empty()) {
22392267
auto& first = t.WriteInfo->BlobsFromHead.front();
22402268
NewHead.PartNo = first.GetPartNo();
22412269

2242-
Parameters->CurOffset = NewHead.Offset;
2243-
Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty();
2270+
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
22442271

22452272
PartitionedBlob = TPartitionedBlob(Partition,
22462273
NewHead.Offset,
@@ -2285,6 +2312,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22852312
}
22862313
}
22872314

2315+
Parameters->FirstCommitWriteOperations = false;
2316+
22882317
WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
22892318
}
22902319

@@ -2561,6 +2590,8 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
25612590

25622591
void TPartition::ResendPendingEvents(const TActorContext& ctx)
25632592
{
2593+
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2594+
25642595
while (!PendingEvents.empty()) {
25652596
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
25662597
PendingEvents.pop_front();
@@ -3433,8 +3464,6 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
34333464

34343465
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
34353466
{
3436-
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3437-
34383467
Y_ABORT_UNLESS(IsSupportive());
34393468

34403469
PendingEvents.emplace_back(ev->ReleaseBase().Release());

ydb/core/persqueue/partition.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
416416
void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
417417
void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
418418
void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
419+
void HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx);
420+
void HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx);
421+
void HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx);
419422

420423
void ChangePlanStepAndTxId(ui64 step, ui64 txId);
421424

@@ -523,10 +526,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
523526
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
524527
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
525528
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
526-
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
529+
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, HandleOnInit);
527530

528-
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
529-
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
531+
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, HandleOnInit);
532+
HFuncTraced(TEvPQ::TEvGetWriteInfoError, HandleOnInit);
530533
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
531534
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
532535
default:
@@ -618,6 +621,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
618621
ui64 CurOffset;
619622
bool OldPartsCleared;
620623
bool HeadCleared;
624+
bool FirstCommitWriteOperations = true;
621625
};
622626

623627
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
460460

461461
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
462462
{
463-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
463+
PQ_LOG_T("TPartition::Handle TEvHandleWriteResponse.");
464464
OnHandleWriteResponse(ctx);
465465
}
466466

0 commit comments

Comments
 (0)