Skip to content

24-3: optimized batch processing in Topics #10139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 72 additions & 18 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void TBatch::Unpack() {
PackedData.Clear();
}

void TBatch::UnpackTo(TVector<TClientBlob> *blobs)
void TBatch::UnpackTo(TVector<TClientBlob> *blobs) const
{
Y_ABORT_UNLESS(PackedData.size());
auto type = Header.GetFormat();
Expand All @@ -446,7 +446,7 @@ NScheme::TDataRef GetChunk(const char*& data, const char *end)
return NScheme::TDataRef(data - size, size);
}

void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
Y_ABORT_UNLESS(PackedData.size());
ui32 totalBlobs = Header.GetCount() + Header.GetInternalPartsCount();
Expand Down Expand Up @@ -606,7 +606,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
}
}

void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) {
void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) const {
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::EUncompressed);
Y_ABORT_UNLESS(PackedData.size());
ui32 shift = 0;
Expand Down Expand Up @@ -640,7 +640,7 @@ ui32 TBatch::FindPos(const ui64 offset, const ui16 partNo) const {
void THead::Clear()
{
Offset = PartNo = PackedSize = 0;
Batches.clear();
ClearBatches();
}

ui64 THead::GetNextOffset() const
Expand All @@ -650,11 +650,7 @@ ui64 THead::GetNextOffset() const

ui16 THead::GetInternalPartsCount() const
{
ui16 res = 0;
for (auto& b : Batches) {
res += b.GetInternalPartsCount();
}
return res;
return InternalPartsCount;
}

ui32 THead::GetCount() const
Expand All @@ -675,15 +671,73 @@ IOutputStream& operator <<(IOutputStream& out, const THead& value)
}

ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const {
ui32 i = 0;
for (; i < Batches.size(); ++i) {
//this batch contains blobs with position bigger than requested
if (Batches[i].GetOffset() > offset || Batches[i].GetOffset() == offset && Batches[i].GetPartNo() > partNo)
break;
}
if (i == 0)
if (Batches.empty()) {
return Max<ui32>();
return i - 1;
}

ui32 i = Batches.size() - 1;
while (i > 0 && Batches[i].IsGreaterThan(offset, partNo)) {
--i;
}

if (i == 0) {
if (Batches[i].IsGreaterThan(offset, partNo)) {
return Max<ui32>();
} else {
return 0;
}
}

return i;
}

void THead::AddBatch(const TBatch& batch) {
auto& b = Batches.emplace_back(batch);
InternalPartsCount += b.GetInternalPartsCount();
}

void THead::ClearBatches() {
Batches.clear();
InternalPartsCount = 0;
}

const std::deque<TBatch>& THead::GetBatches() const {
return Batches;
}

const TBatch& THead::GetBatch(ui32 idx) const {
return Batches.at(idx);
}

const TBatch& THead::GetLastBatch() const {
Y_ABORT_UNLESS(!Batches.empty());
return Batches.back();
}

TBatch THead::ExtractFirstBatch() {
Y_ABORT_UNLESS(!Batches.empty());
auto batch = std::move(Batches.front());
InternalPartsCount -= batch.GetInternalPartsCount();
Batches.pop_front();
return batch;
}

THead::TBatchAccessor THead::MutableBatch(ui32 idx) {
Y_ABORT_UNLESS(idx < Batches.size());
return TBatchAccessor(Batches[idx]);
}

THead::TBatchAccessor THead::MutableLastBatch() {
Y_ABORT_UNLESS(!Batches.empty());
return TBatchAccessor(Batches.back());
}

void THead::AddBlob(const TClientBlob& blob) {
Y_ABORT_UNLESS(!Batches.empty());
auto& batch = Batches.back();
InternalPartsCount -= batch.GetInternalPartsCount();
batch.AddBlob(blob);
InternalPartsCount += batch.GetInternalPartsCount();
}

TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) :
Expand Down Expand Up @@ -832,7 +886,7 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio

GlueHead = GlueNewHead = false;
if (!Blobs.empty()) {
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
auto batch = TBatch::FromBlobs(Offset, std::move(Blobs));
Blobs.clear();
batch.Pack();
Y_ABORT_UNLESS(batch.Packed);
Expand Down
72 changes: 52 additions & 20 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,38 +121,30 @@ struct TBatch {
TVector<ui32> InternalPartsPos;
NKikimrPQ::TBatchHeader Header;
TBuffer PackedData;

TBatch()
: Packed(false)
{
PackedData.Reserve(8_MB);
}

TBatch(const ui64 offset, const ui16 partNo, const TVector<TClientBlob>& blobs)
: Packed(false)
TBatch(const ui64 offset, const ui16 partNo)
: TBatch()
{
PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
Header.SetCount(0);
Header.SetInternalPartsCount(0);
for (auto& b : blobs) {
AddBlob(b);
}
}

TBatch(const ui64 offset, const ui16 partNo, const std::deque<TClientBlob>& blobs)
: Packed(false)
{
PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
Header.SetCount(0);
Header.SetInternalPartsCount(0);
static TBatch FromBlobs(const ui64 offset, std::deque<TClientBlob>&& blobs) {
Y_ABORT_UNLESS(!blobs.empty());
TBatch batch(offset, blobs.front().GetPartNo());
for (auto& b : blobs) {
AddBlob(b);
batch.AddBlob(b);
}
return batch;
}

void AddBlob(const TClientBlob &b) {
Expand Down Expand Up @@ -187,6 +179,9 @@ struct TBatch {
ui16 GetInternalPartsCount() const {
return Header.GetInternalPartsCount();
}
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
}

TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
: Packed(true)
Expand All @@ -198,9 +193,9 @@ struct TBatch {
ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); }
void Pack();
void Unpack();
void UnpackTo(TVector<TClientBlob> *result);
void UnpackToType0(TVector<TClientBlob> *result);
void UnpackToType1(TVector<TClientBlob> *result);
void UnpackTo(TVector<TClientBlob> *result) const;
void UnpackToType0(TVector<TClientBlob> *result) const;
void UnpackToType1(TVector<TClientBlob> *result) const;

void SerializeTo(TString& res) const;

Expand Down Expand Up @@ -232,14 +227,39 @@ class TBlobIterator {
ui16 InternalPartsCount;
};

class TPartitionedBlob;

//THead represents bathes, stored in head(at most 8 Mb)
struct THead {
std::deque<TBatch> Batches;
//all batches except last must be packed
// BlobsSize <= 512Kb
// size of Blobs after packing must be <= BlobsSize
//otherwise head will be compacted not in total, some blobs will still remain in head
//PackedSize + BlobsSize must be <= 8Mb
private:
std::deque<TBatch> Batches;
ui16 InternalPartsCount = 0;

friend class TPartitionedBlob;

class TBatchAccessor {
TBatch& Batch;

public:
explicit TBatchAccessor(TBatch& batch)
: Batch(batch)
{}

void Pack() {
Batch.Pack();
}

void Unpack() {
Batch.Unpack();
}
};

public:
ui64 Offset;
ui16 PartNo;
ui32 PackedSize;
Expand All @@ -261,6 +281,18 @@ struct THead {
//return Max<ui32> if not such pos in head
//returns batch with such position
ui32 FindPos(const ui64 offset, const ui16 partNo) const;

void AddBatch(const TBatch& batch);
void ClearBatches();
const std::deque<TBatch>& GetBatches() const;
const TBatch& GetBatch(ui32 idx) const;
const TBatch& GetLastBatch() const;
TBatchAccessor MutableBatch(ui32 idx);
TBatchAccessor MutableLastBatch();
TBatch ExtractFirstBatch();
void AddBlob(const TClientBlob& blob);

friend IOutputStream& operator <<(IOutputStream& out, const THead& value);
};

IOutputStream& operator <<(IOutputStream& out, const THead& value);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
Y_ABORT_UNLESS(size == read.GetValue().size());

for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
head.Batches.emplace_back(it.GetBatch());
head.AddBatch(it.GetBatch());
}
head.PackedSize += size;

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,13 +626,13 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
Y_ABORT_UNLESS(pos != Max<ui32>());
}
ui32 lastBlobSize = 0;
for (;pos < Head.Batches.size(); ++pos) {
for (;pos < Head.GetBatches().size(); ++pos) {

TVector<TClientBlob> blobs;
Head.Batches[pos].UnpackTo(&blobs);
Head.GetBatch(pos).UnpackTo(&blobs);
ui32 i = 0;
ui64 offset = Head.Batches[pos].GetOffset();
ui16 pno = Head.Batches[pos].GetPartNo();
ui64 offset = Head.GetBatch(pos).GetOffset();
ui16 pno = Head.GetBatch(pos).GetPartNo();
for (; i < blobs.size(); ++i) {

ui64 curOffset = offset;
Expand Down
47 changes: 23 additions & 24 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
Head.PackedSize = 0;
Head.Offset = NewHead.Offset;
Head.PartNo = NewHead.PartNo; //no partNo at this point
Head.Batches.clear();
Head.ClearBatches();
}

while (!CompactedKeys.empty()) {
Expand All @@ -428,9 +428,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
} // head cleared, all data moved to body

//append Head with newHead
while (!NewHead.Batches.empty()) {
Head.Batches.push_back(NewHead.Batches.front());
NewHead.Batches.pop_front();
while (!NewHead.GetBatches().empty()) {
Head.AddBatch(NewHead.ExtractFirstBatch());
}
Head.PackedSize += NewHead.PackedSize;

Expand Down Expand Up @@ -1324,22 +1323,22 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
ctx);
ui32 countOfLastParts = 0;
for (auto& x : PartitionedBlob.GetClientBlobs()) {
if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) {
NewHead.Batches.emplace_back(curOffset, x.GetPartNo(), TVector<TClientBlob>());
if (NewHead.GetBatches().empty() || NewHead.GetLastBatch().Packed) {
NewHead.AddBatch(TBatch(curOffset, x.GetPartNo()));
NewHead.PackedSize += GetMaxHeaderSize(); //upper bound for packed size
}
if (x.IsLastPart()) {
++countOfLastParts;
}
Y_ABORT_UNLESS(!NewHead.Batches.back().Packed);
NewHead.Batches.back().AddBlob(x);
Y_ABORT_UNLESS(!NewHead.GetLastBatch().Packed);
NewHead.AddBlob(x);
NewHead.PackedSize += x.GetBlobSize();
if (NewHead.Batches.back().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
NewHead.Batches.back().Pack();
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
if (NewHead.GetLastBatch().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
NewHead.MutableLastBatch().Pack();
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob

NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
}
}

Expand Down Expand Up @@ -1416,15 +1415,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
valueD.reserve(res.second);
ui32 pp = Head.FindPos(key.GetOffset(), key.GetPartNo());
if (pp < Max<ui32>() && key.GetOffset() < EndOffset) { //this batch trully contains this offset
Y_ABORT_UNLESS(pp < Head.Batches.size());
Y_ABORT_UNLESS(Head.Batches[pp].GetOffset() == key.GetOffset());
Y_ABORT_UNLESS(Head.Batches[pp].GetPartNo() == key.GetPartNo());
for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here
Y_ABORT_UNLESS(Head.Batches[pp].Packed);
Head.Batches[pp].SerializeTo(valueD);
Y_ABORT_UNLESS(pp < Head.GetBatches().size());
Y_ABORT_UNLESS(Head.GetBatch(pp).GetOffset() == key.GetOffset());
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
Head.GetBatch(pp).SerializeTo(valueD);
}
}
for (auto& b : NewHead.Batches) {
for (auto& b : NewHead.GetBatches()) {
Y_ABORT_UNLESS(b.Packed);
b.SerializeTo(valueD);
}
Expand Down Expand Up @@ -1701,7 +1700,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx)
NewHead.PartNo = 0;
NewHead.PackedSize = 0;

Y_ABORT_UNLESS(NewHead.Batches.empty());
Y_ABORT_UNLESS(NewHead.GetBatches().empty());

Parameters->OldPartsCleared = false;
Parameters->HeadCleared = (Head.PackedSize == 0);
Expand Down Expand Up @@ -1746,12 +1745,12 @@ void TPartition::EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, co

UpdateWriteBufferIsFullState(ctx.Now());

if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
NewHead.Batches.back().Pack();
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
if (!NewHead.GetBatches().empty() && !NewHead.GetLastBatch().Packed) {
NewHead.MutableLastBatch().Pack();
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob

NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
}

Y_ABORT_UNLESS((Parameters->HeadCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl
Expand Down
Loading
Loading