Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ ui32 THead::GetCount() const

//how much offsets before last batch and how much offsets in last batch
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset);

return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
}

Expand Down Expand Up @@ -940,16 +941,23 @@ auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>

auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
{
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
NeedCompactHead = false;
}

std::optional<TFormedBlobInfo> res;
if (NeedCompactHead) {
NeedCompactHead = false;
GlueNewHead = false;
res = CreateFormedBlob(0, false);

StartOffset = NewHead.Offset + NewHead.GetCount();
NewHead.Clear();
NewHead.Offset = StartOffset;
}

TKey newKey(TKeyPrefix::TypeData,
Partition,
NewHead.Offset + oldKey.GetOffset(),
StartOffset,
oldKey.GetPartNo(),
oldKey.GetCount(),
oldKey.GetInternalPartsCount(),
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ struct TEvPQ {
ui64 TxId;
TVector<NKikimrPQ::TPartitionOperation> Operations;
TActorId SupportivePartitionActor;
bool ForceFalse = false;
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
Expand Down
69 changes: 49 additions & 20 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ ui64 TPartition::UserDataSize() const {
return 0;
}

// We assume that DataKyesBody contains an up-to-date set of blobs, their relevance is
// We assume that DataKeysBody contains an up-to-date set of blobs, their relevance is
// maintained by the background process. However, the last block may contain several irrelevant
// messages. Because of them, we throw out the size of the entire blob.
auto size = Size();
Expand Down Expand Up @@ -964,15 +964,13 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor

void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

Expand All @@ -983,9 +981,32 @@ void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContex

void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);
PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoRequest");

Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoResponse");

Y_ABORT_UNLESS(!IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvGetWriteInfoError");

Y_ABORT_UNLESS(!IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}
Expand Down Expand Up @@ -1084,7 +1105,9 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
}

void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
"Write info requested while writes are not complete");
ctx.Send(ev->Sender, response);
Expand All @@ -1110,6 +1133,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
response->MessagesSizes = std::move(MessageSize.GetValues());
response->InputLags = std::move(SupportivePartitionTimeLag);

PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoResponse");
ctx.Send(ev->Sender, response);
}

Expand Down Expand Up @@ -1204,7 +1228,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo


void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError");
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
"Cookie " << ev->Get()->Cookie <<
", Message " << ev->Get()->Message);
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
}

Expand Down Expand Up @@ -1890,7 +1916,6 @@ void TPartition::ProcessCommitQueue() {
return this->ExecUserActionOrTransaction(event, request);
};
while (!UserActionAndTxPendingCommit.empty()) {
// UserActionAndTxPendingCommit.pop_front();
auto& front = UserActionAndTxPendingCommit.front();
auto state = ECommitState::Committed;
if (auto* tx = get_if<TSimpleSharedPtr<TTransaction>>(&front.Event)) {
Expand Down Expand Up @@ -2059,8 +2084,9 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
Y_ABORT_UNLESS(t->ChangeConfig);

Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig);
if (!FirstEvent)
if (!FirstEvent) {
return EProcessResult::Blocked;
}
ChangingConfig = true;
// Should remove this and add some id to TEvChangeConfig if we want to batch change of configs
t->State = ECommitState::Committed;
Expand Down Expand Up @@ -2216,6 +2242,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);

if (!t.WriteInfo->BodyKeys.empty()) {
bool needCompactHead =
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;

PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
"", // SourceId
Expand All @@ -2225,7 +2254,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
Head,
NewHead,
Parameters->HeadCleared, // headCleared
Head.PackedSize != 0, // needCompactHead
needCompactHead, // needCompactHead
MaxBlobSize);

for (auto& k : t.WriteInfo->BodyKeys) {
Expand All @@ -2236,9 +2265,9 @@ void TPartition::CommitWriteOperations(TTransaction& t)
CompactedKeys.emplace_back(write->Key, write->Value.size());
ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get());
}
Parameters->CurOffset += k.Key.GetCount();
}


PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
Expand All @@ -2249,17 +2278,15 @@ void TPartition::CommitWriteOperations(TTransaction& t)
ctx);
}

const auto& last = t.WriteInfo->BodyKeys.back();

NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
NewHead.Clear();
NewHead.Offset = Parameters->CurOffset;
}

if (!t.WriteInfo->BlobsFromHead.empty()) {
auto& first = t.WriteInfo->BlobsFromHead.front();
NewHead.PartNo = first.GetPartNo();

Parameters->CurOffset = NewHead.Offset;
Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty();
Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();

PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
Expand Down Expand Up @@ -2304,6 +2331,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}
}

Parameters->FirstCommitWriteOperations = false;

WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
}

Expand Down Expand Up @@ -2580,6 +2609,8 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)

void TPartition::ResendPendingEvents(const TActorContext& ctx)
{
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());

while (!PendingEvents.empty()) {
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
PendingEvents.pop_front();
Expand Down Expand Up @@ -3452,8 +3483,6 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");

Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx);

void ChangePlanStepAndTxId(ui64 step, ui64 txId);

Expand Down Expand Up @@ -527,10 +530,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, HandleOnInit);

HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, HandleOnInit);
HFuncTraced(TEvPQ::TEvGetWriteInfoError, HandleOnInit);
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
default:
Expand Down Expand Up @@ -622,6 +625,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
ui64 CurOffset;
bool OldPartsCleared;
bool HeadCleared;
bool FirstCommitWriteOperations = true;
};

static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx)

void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
{
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
PQ_LOG_T("TPartition::Handle TEvHandleWriteResponse.");
OnHandleWriteResponse(ctx);
}

Expand Down
Loading
Loading