Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ void TBatch::Pack() {
Header.SetPayloadSize(PackedData.size());
}

for (auto& b : Blobs) {
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
}


TVector<TClientBlob> tmp;
Blobs.swap(tmp);
Expand All @@ -414,11 +418,14 @@ void TBatch::Unpack() {
UnpackTo(&Blobs);
Y_ABORT_UNLESS(InternalPartsPos.empty());
for (ui32 i = 0; i < Blobs.size(); ++i) {
if (!Blobs[i].IsLastPart())
auto& b = Blobs[i];
if (!b.IsLastPart()) {
InternalPartsPos.push_back(i);
}
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
}
Y_ABORT_UNLESS(InternalPartsPos.size() == GetInternalPartsCount());

PackedData.Clear();
}

Expand Down Expand Up @@ -978,4 +985,3 @@ bool TPartitionedBlob::IsNextPart(const TString& sourceId, const ui64 seqNo, con

}// NPQ
}// NKikimr

20 changes: 18 additions & 2 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct TClientBlob {
void SerializeTo(TBuffer& buffer) const;
static TClientBlob Deserialize(const char *data, ui32 size);

static void CheckBlob(const TKey& key, const TString& blob);
static void CheckBlob(const TKey& key, const TString& blob);
};

static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
Expand All @@ -121,6 +121,7 @@ struct TBatch {
TVector<ui32> InternalPartsPos;
NKikimrPQ::TBatchHeader Header;
TBuffer PackedData;
TInstant EndWriteTimestamp;

TBatch()
: Packed(false)
Expand Down Expand Up @@ -162,27 +163,42 @@ struct TBatch {
Header.SetUnpackedSize(unpackedSize);
Header.SetCount(count);
Header.SetInternalPartsCount(InternalPartsPos.size());

EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
}

ui64 GetOffset() const {
return Header.GetOffset();
}

ui16 GetPartNo() const {
return Header.GetPartNo();
}

ui32 GetUnpackedSize() const {
return Header.GetUnpackedSize();
}

ui32 GetCount() const {
return Header.GetCount();
}

ui16 GetInternalPartsCount() const {
return Header.GetInternalPartsCount();
}

bool IsGreaterThan(ui64 offset, ui16 partNo) const {
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
}

bool Empty() const {
return Blobs.empty();
}

TInstant GetEndWriteTimestamp() const {
return EndWriteTimestamp;
}

TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
: Packed(true)
, Header(header)
Expand Down Expand Up @@ -239,7 +255,7 @@ struct THead {
private:
std::deque<TBatch> Batches;
ui16 InternalPartsCount = 0;

friend class TPartitionedBlob;

class TBatchAccessor {
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,14 @@ ui64 TPartition::ImportantClientsMinOffset() const {
return minOffset;
}

TInstant TPartition::GetEndWriteTimestamp() const {
return EndWriteTimestamp;
}

THead& TPartition::GetHead() {
return Head;
}

void TPartition::HandleWakeup(const TActorContext& ctx) {
FilterDeadlinedWrites(ctx);

Expand Down Expand Up @@ -356,6 +364,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
meta.SetStartOffset(StartOffset);
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
meta.SetEndWriteTimestamp(PendingWriteTimestamp.MilliSeconds());

if (IsSupportive()) {
auto* counterData = meta.MutableCounterData();
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
friend TInitInfoRangeStep;
friend TInitDataRangeStep;
friend TInitDataStep;
friend TInitEndWriteTimestampStep;

friend TPartitionSourceManager;

Expand Down Expand Up @@ -440,6 +441,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);

ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -470,6 +472,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
// Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
ui64 ImportantClientsMinOffset() const;

TInstant GetEndWriteTimestamp() const; // For tests only
THead& GetHead(); // For tests only

//Bootstrap sends kvRead
//Become StateInit
Expand Down Expand Up @@ -666,6 +670,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
// [DataKeysBody ][DataKeysHead ]
ui64 StartOffset;
ui64 EndOffset;
TInstant EndWriteTimestamp;
TInstant PendingWriteTimestamp;

ui64 WriteInflightSize;
TActorId Tablet;
Expand Down
54 changes: 45 additions & 9 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ TInitializer::TInitializer(TPartition* partition)
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
Steps.push_back(MakeHolder<TInitDataStep>(this));
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));

CurrentStep = Steps.begin();
}
Expand Down Expand Up @@ -308,14 +309,14 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
bool res = meta.ParseFromString(response.GetValue());
Y_ABORT_UNLESS(res);

/* Bring back later, when switch to 21-2 will be unable
StartOffset = meta.GetStartOffset();
EndOffset = meta.GetEndOffset();
if (StartOffset == EndOffset) {
NewHead.Offset = Head.Offset = EndOffset;
}
*/
Partition()->StartOffset = meta.GetStartOffset();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Y_ABORT_UNLESS that checks old StartOffset and this StartOffset. And the same for EndOffset. At this point offsets must be equal. You need to remove this ABORT only when empty partitions with StartOffset > 0 will be acceptable.

Partition()->EndOffset = meta.GetEndOffset();
if (Partition()->StartOffset == Partition()->EndOffset) {
Partition()->NewHead.Offset = Partition()->Head.Offset = Partition()->EndOffset;
}
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp());
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
if (Partition()->IsSupportive()) {
const auto& counterData = meta.GetCounterData();
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
Expand Down Expand Up @@ -495,7 +496,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
if (k.GetPartNo() > 0) ++startOffset;
head.PartNo = 0;
} else {
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%" PRIu64 " <= %" PRIu64 " %s", endOffset, k.GetOffset(), pair.GetKey().c_str());
if (endOffset < k.GetOffset()) {
gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset()));
gapSize += k.GetOffset() - endOffset;
Expand Down Expand Up @@ -619,7 +620,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte

Y_ABORT_UNLESS(offset + 1 >= Partition()->StartOffset);
Y_ABORT_UNLESS(offset < Partition()->EndOffset);
Y_ABORT_UNLESS(size == read.GetValue().size());
Y_ABORT_UNLESS(size == read.GetValue().size(), "size=%d == read.GetValue().size() = %d", size, read.GetValue().size());

for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
head.AddBatch(it.GetBatch());
Expand Down Expand Up @@ -653,6 +654,41 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
}


//
// TInitEndWriteTimestampStep
//

TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
: TInitializerStep(initializer, "TInitEndWriteTimestampStep", true) {
}

void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) {
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
<< "' partition " << Partition()->Partition
<< " skiped because already initialized.");
return Done(ctx);
}

TDataKey* lastKey = nullptr;
if (!Partition()->HeadKeys.empty()) {
lastKey = &Partition()->HeadKeys.back();
} else if (!Partition()->DataKeysBody.empty()) {
lastKey = &Partition()->DataKeysBody.back();
}

if (lastKey) {
Partition()->EndWriteTimestamp = lastKey->Timestamp;
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
}

PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
<< "' partition " << Partition()->Partition
<< " from keys completed. Value " << Partition()->EndWriteTimestamp);

return Done(ctx);
}

//
// TPartition
//
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/persqueue/partition_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,11 @@ class TInitDataStep: public TBaseKVStep {
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
};

class TInitEndWriteTimestampStep: public TInitializerStep {
public:
TInitEndWriteTimestampStep(TInitializer* initializer);

void Execute(const TActorContext& ctx) override;
};

} // NKikimr::NPQ
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
PROPERTY("StartOffset", StartOffset);
PROPERTY("EndOffset", EndOffset);
PROPERTY("LastOffset", Head.GetNextOffset());
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
}
}
Expand Down
35 changes: 27 additions & 8 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ namespace NKikimr::NPQ {

static const ui32 MAX_USER_ACTS = 1000;

TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
return {};
}

TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(maxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(readTimestampMs));
timestamp = Max(timestamp, consumerReadFromTimestamp);
return timestamp;
}

ui64 TPartition::GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const {
if (!readTimestamp) {
return offset;
}
return Max(GetOffsetEstimate(DataKeysBody, *readTimestamp, Min(Head.Offset, EndOffset - 1)), offset);
}

void TPartition::SendReadingFinished(const TString& consumer) {
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
}
Expand Down Expand Up @@ -133,7 +151,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
};

for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
if (request->Offset < EndOffset) {
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
ctx.Send(request->Sender, response.Release());
} else if (!IsActive()) {
Expand Down Expand Up @@ -170,16 +188,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont

auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();

auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO what?


TActorId sender = ActorIdFromProto(record.GetSender());
if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
ctx.Send(sender, response.Release());
} else if (InitDone && !IsActive()) {
auto response = MakeHasDataInfoResponse(0, cookie, true);
ctx.Send(sender, response.Release());
} else {
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
record.HasClientId() && InitDone ? record.GetClientId() : ""};
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
auto res = HasDataRequests.insert(req);
HasDataDeadlines.insert(dl);
Expand Down Expand Up @@ -763,11 +783,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
}
userInfo->ReadsInQuotaQueue--;
ui64 offset = read->Offset;
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);

auto readTimestamp = GetReadFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp, ctx);
if (read->PartNo == 0 && readTimestamp) {
offset = GetReadOffset(offset, readTimestamp);
userInfo->ReadOffsetRewindSum += offset - read->Offset;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ struct TPartition::THasDataReq {
TActorId Sender;
TMaybe<ui64> Cookie;
TString ClientId;
TMaybe<TInstant> ReadTimestamp;

bool operator < (const THasDataReq& req) const {
return Num < req.Num;
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,9 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");

if (!CompactedKeys.empty())
if (!CompactedKeys.empty()) {
HeadKeys.clear();
}

if (NewHeadKey.Size > 0) {
while (!HeadKeys.empty() &&
Expand Down Expand Up @@ -437,6 +438,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
}

EndOffset = Head.GetNextOffset();
EndWriteTimestamp = PendingWriteTimestamp;

NewHead.Clear();
NewHead.Offset = EndOffset;

Expand Down Expand Up @@ -1397,12 +1400,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
Head.GetBatch(pp).SerializeTo(valueD);
auto& batch = Head.GetBatch(pp);
batch.SerializeTo(valueD);
PendingWriteTimestamp = std::max(PendingWriteTimestamp, batch.GetEndWriteTimestamp());
}
}
for (auto& b : NewHead.GetBatches()) {
Y_ABORT_UNLESS(b.Packed);
b.SerializeTo(valueD);
PendingWriteTimestamp = std::max(PendingWriteTimestamp, b.GetEndWriteTimestamp());
}

Y_ABORT_UNLESS(res.second >= valueD.size());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer__balancing_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void TBalancer::RenderApp(NApp::TNavigationBar& __navigationBar) const {
}
TABLER() {
TABLED() { }
TABLED() { __stream << "<__streamong>Total:</__streamong>"; }
TABLED() { __stream << "<strong>Total:</strong>"; }
TABLED() { }
TABLED() { __stream << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
TABLED() { __stream << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / "
Expand Down
Loading
Loading