Skip to content

Commit c16c326

Browse files
Merge 76e77cf into ef48b00
2 parents ef48b00 + 76e77cf commit c16c326

File tree

15 files changed

+664
-214
lines changed

15 files changed

+664
-214
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ ui32 THead::GetCount() const
667667

668668
//how much offsets before last batch and how much offsets in last batch
669669
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset);
670+
670671
return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
671672
}
672673

@@ -940,16 +941,23 @@ auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
940941

941942
auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
942943
{
944+
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
945+
NeedCompactHead = false;
946+
}
947+
943948
std::optional<TFormedBlobInfo> res;
944949
if (NeedCompactHead) {
945950
NeedCompactHead = false;
946-
GlueNewHead = false;
947951
res = CreateFormedBlob(0, false);
952+
953+
StartOffset = NewHead.Offset + NewHead.GetCount();
954+
NewHead.Clear();
955+
NewHead.Offset = StartOffset;
948956
}
949957

950958
TKey newKey(TKeyPrefix::TypeData,
951959
Partition,
952-
NewHead.Offset + oldKey.GetOffset(),
960+
StartOffset,
953961
oldKey.GetPartNo(),
954962
oldKey.GetCount(),
955963
oldKey.GetInternalPartsCount(),

ydb/core/persqueue/events/internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ struct TEvPQ {
816816
ui64 TxId;
817817
TVector<NKikimrPQ::TPartitionOperation> Operations;
818818
TActorId SupportivePartitionActor;
819+
bool ForceFalse = false;
819820
};
820821

821822
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {

ydb/core/persqueue/partition.cpp

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ ui64 TPartition::UserDataSize() const {
251251
return 0;
252252
}
253253

254-
// We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is
254+
// We assume that DataKeysBody contains an up-to-date set of blobs, their relevance is
255255
// maintained by the background process. However, the last block may contain several irrelevant
256256
// messages. Because of them, we throw out the size of the entire blob.
257257
auto size = Size();
@@ -964,15 +964,13 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor
964964

965965
void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
966966
{
967+
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");
968+
967969
PendingEvents.emplace_back(ev->ReleaseBase().Release());
968970
}
969971

970972
void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
971973
{
972-
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit" <<
973-
" Step " << ev->Get()->Step <<
974-
", TxId " << ev->Get()->TxId);
975-
976974
PendingEvents.emplace_back(ev->ReleaseBase().Release());
977975
}
978976

@@ -983,9 +981,32 @@ void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContex
983981

984982
void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
985983
{
986-
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig" <<
987-
" Step " << ev->Get()->Step <<
988-
", TxId " << ev->Get()->TxId);
984+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
985+
}
986+
987+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
988+
{
989+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoRequest");
990+
991+
Y_ABORT_UNLESS(IsSupportive());
992+
993+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
994+
}
995+
996+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
997+
{
998+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoResponse");
999+
1000+
Y_ABORT_UNLESS(!IsSupportive());
1001+
1002+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
1003+
}
1004+
1005+
void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
1006+
{
1007+
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoError");
1008+
1009+
Y_ABORT_UNLESS(!IsSupportive());
9891010

9901011
PendingEvents.emplace_back(ev->ReleaseBase().Release());
9911012
}
@@ -1084,7 +1105,9 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
10841105
}
10851106

10861107
void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
1108+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
10871109
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
1110+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
10881111
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
10891112
"Write info requested while writes are not complete");
10901113
ctx.Send(ev->Sender, response);
@@ -1110,6 +1133,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
11101133
response->MessagesSizes = std::move(MessageSize.GetValues());
11111134
response->InputLags = std::move(SupportivePartitionTimeLag);
11121135

1136+
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse");
11131137
ctx.Send(ev->Sender, response);
11141138
}
11151139

@@ -1204,7 +1228,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
12041228

12051229

12061230
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
1207-
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError");
1231+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
1232+
"Cookie " << ev->Get()->Cookie <<
1233+
", Message " << ev->Get()->Message);
12081234
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
12091235
}
12101236

@@ -1890,7 +1916,6 @@ void TPartition::ProcessCommitQueue() {
18901916
return this->ExecUserActionOrTransaction(event, request);
18911917
};
18921918
while (!UserActionAndTxPendingCommit.empty()) {
1893-
// UserActionAndTxPendingCommit.pop_front();
18941919
auto& front = UserActionAndTxPendingCommit.front();
18951920
auto state = ECommitState::Committed;
18961921
if (auto* tx = get_if<TSimpleSharedPtr<TTransaction>>(&front.Event)) {
@@ -2059,8 +2084,9 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
20592084
Y_ABORT_UNLESS(t->ChangeConfig);
20602085

20612086
Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig);
2062-
if (!FirstEvent)
2087+
if (!FirstEvent) {
20632088
return EProcessResult::Blocked;
2089+
}
20642090
ChangingConfig = true;
20652091
// Should remove this and add some id to TEvChangeConfig if we want to batch change of configs
20662092
t->State = ECommitState::Committed;
@@ -2216,6 +2242,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22162242
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
22172243

22182244
if (!t.WriteInfo->BodyKeys.empty()) {
2245+
bool needCompactHead =
2246+
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
2247+
22192248
PartitionedBlob = TPartitionedBlob(Partition,
22202249
NewHead.Offset,
22212250
"", // SourceId
@@ -2225,7 +2254,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22252254
Head,
22262255
NewHead,
22272256
Parameters->HeadCleared, // headCleared
2228-
Head.PackedSize != 0, // needCompactHead
2257+
needCompactHead, // needCompactHead
22292258
MaxBlobSize);
22302259

22312260
for (auto& k : t.WriteInfo->BodyKeys) {
@@ -2236,9 +2265,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22362265
CompactedKeys.emplace_back(write->Key, write->Value.size());
22372266
ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get());
22382267
}
2268+
Parameters->CurOffset += k.Key.GetCount();
22392269
}
22402270

2241-
22422271
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
22432272
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
22442273
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
@@ -2249,17 +2278,15 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22492278
ctx);
22502279
}
22512280

2252-
const auto& last = t.WriteInfo->BodyKeys.back();
2253-
2254-
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
2281+
NewHead.Clear();
2282+
NewHead.Offset = Parameters->CurOffset;
22552283
}
22562284

22572285
if (!t.WriteInfo->BlobsFromHead.empty()) {
22582286
auto& first = t.WriteInfo->BlobsFromHead.front();
22592287
NewHead.PartNo = first.GetPartNo();
22602288

2261-
Parameters->CurOffset = NewHead.Offset;
2262-
Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty();
2289+
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();
22632290

22642291
PartitionedBlob = TPartitionedBlob(Partition,
22652292
NewHead.Offset,
@@ -2304,6 +2331,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
23042331
}
23052332
}
23062333

2334+
Parameters->FirstCommitWriteOperations = false;
2335+
23072336
WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
23082337
}
23092338

@@ -2580,6 +2609,8 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
25802609

25812610
void TPartition::ResendPendingEvents(const TActorContext& ctx)
25822611
{
2612+
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());
2613+
25832614
while (!PendingEvents.empty()) {
25842615
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
25852616
PendingEvents.pop_front();
@@ -3452,8 +3483,6 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
34523483

34533484
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
34543485
{
3455-
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3456-
34573486
Y_ABORT_UNLESS(IsSupportive());
34583487

34593488
PendingEvents.emplace_back(ev->ReleaseBase().Release());

ydb/core/persqueue/partition.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
417417
void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
418418
void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
419419
void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
420+
void HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx);
421+
void HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx);
422+
void HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx);
420423

421424
void ChangePlanStepAndTxId(ui64 step, ui64 txId);
422425

@@ -527,10 +530,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
527530
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
528531
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
529532
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
530-
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
533+
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, HandleOnInit);
531534

532-
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
533-
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
535+
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, HandleOnInit);
536+
HFuncTraced(TEvPQ::TEvGetWriteInfoError, HandleOnInit);
534537
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
535538
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
536539
default:
@@ -622,6 +625,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
622625
ui64 CurOffset;
623626
bool OldPartsCleared;
624627
bool HeadCleared;
628+
bool FirstCommitWriteOperations = true;
625629
};
626630

627631
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
469469

470470
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
471471
{
472-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
472+
PQ_LOG_T("TPartition::Handle TEvHandleWriteResponse.");
473473
OnHandleWriteResponse(ctx);
474474
}
475475

0 commit comments

Comments
 (0)