Skip to content

Commit 74fedbe

Browse files
authored
Merge cc98ff1 into 3296632
2 parents 3296632 + cc98ff1 commit 74fedbe

File tree

6 files changed

+172
-87
lines changed

6 files changed

+172
-87
lines changed

ydb/core/persqueue/blob.cpp

+72-18
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ void TBatch::Unpack() {
422422
PackedData.Clear();
423423
}
424424

425-
void TBatch::UnpackTo(TVector<TClientBlob> *blobs)
425+
void TBatch::UnpackTo(TVector<TClientBlob> *blobs) const
426426
{
427427
Y_ABORT_UNLESS(PackedData.size());
428428
auto type = Header.GetFormat();
@@ -446,7 +446,7 @@ NScheme::TDataRef GetChunk(const char*& data, const char *end)
446446
return NScheme::TDataRef(data - size, size);
447447
}
448448

449-
void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
449+
void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
450450
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
451451
Y_ABORT_UNLESS(PackedData.size());
452452
ui32 totalBlobs = Header.GetCount() + Header.GetInternalPartsCount();
@@ -606,7 +606,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
606606
}
607607
}
608608

609-
void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) {
609+
void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) const {
610610
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::EUncompressed);
611611
Y_ABORT_UNLESS(PackedData.size());
612612
ui32 shift = 0;
@@ -640,7 +640,7 @@ ui32 TBatch::FindPos(const ui64 offset, const ui16 partNo) const {
640640
void THead::Clear()
641641
{
642642
Offset = PartNo = PackedSize = 0;
643-
Batches.clear();
643+
ClearBatches();
644644
}
645645

646646
ui64 THead::GetNextOffset() const
@@ -650,11 +650,7 @@ ui64 THead::GetNextOffset() const
650650

651651
ui16 THead::GetInternalPartsCount() const
652652
{
653-
ui16 res = 0;
654-
for (auto& b : Batches) {
655-
res += b.GetInternalPartsCount();
656-
}
657-
return res;
653+
return InternalPartsCount;
658654
}
659655

660656
ui32 THead::GetCount() const
@@ -675,15 +671,73 @@ IOutputStream& operator <<(IOutputStream& out, const THead& value)
675671
}
676672

677673
ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const {
678-
ui32 i = 0;
679-
for (; i < Batches.size(); ++i) {
680-
//this batch contains blobs with position bigger than requested
681-
if (Batches[i].GetOffset() > offset || Batches[i].GetOffset() == offset && Batches[i].GetPartNo() > partNo)
682-
break;
683-
}
684-
if (i == 0)
674+
if (Batches.empty()) {
685675
return Max<ui32>();
686-
return i - 1;
676+
}
677+
678+
ui32 i = Batches.size() - 1;
679+
while (i > 0 && Batches[i].IsGreaterThan(offset, partNo)) {
680+
--i;
681+
}
682+
683+
if (i == 0) {
684+
if (Batches[i].IsGreaterThan(offset, partNo)) {
685+
return Max<ui32>();
686+
} else {
687+
return 0;
688+
}
689+
}
690+
691+
return i;
692+
}
693+
694+
void THead::AddBatch(const TBatch& batch) {
695+
auto& b = Batches.emplace_back(batch);
696+
InternalPartsCount += b.GetInternalPartsCount();
697+
}
698+
699+
void THead::ClearBatches() {
700+
Batches.clear();
701+
InternalPartsCount = 0;
702+
}
703+
704+
const std::deque<TBatch>& THead::GetBatches() const {
705+
return Batches;
706+
}
707+
708+
const TBatch& THead::GetBatch(ui32 idx) const {
709+
return Batches.at(idx);
710+
}
711+
712+
const TBatch& THead::GetLastBatch() const {
713+
Y_ABORT_UNLESS(!Batches.empty());
714+
return Batches.back();
715+
}
716+
717+
TBatch THead::ExtractFirstBatch() {
718+
Y_ABORT_UNLESS(!Batches.empty());
719+
auto batch = std::move(Batches.front());
720+
InternalPartsCount -= batch.GetInternalPartsCount();
721+
Batches.pop_front();
722+
return batch;
723+
}
724+
725+
THead::TBatchAccessor THead::MutableBatch(ui32 idx) {
726+
Y_ABORT_UNLESS(idx < Batches.size());
727+
return TBatchAccessor(Batches[idx]);
728+
}
729+
730+
THead::TBatchAccessor THead::MutableLastBatch() {
731+
Y_ABORT_UNLESS(!Batches.empty());
732+
return TBatchAccessor(Batches.back());
733+
}
734+
735+
void THead::AddBlob(const TClientBlob& blob) {
736+
Y_ABORT_UNLESS(!Batches.empty());
737+
auto& batch = Batches.back();
738+
InternalPartsCount -= batch.GetInternalPartsCount();
739+
batch.AddBlob(blob);
740+
InternalPartsCount += batch.GetInternalPartsCount();
687741
}
688742

689743
TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) :
@@ -832,7 +886,7 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
832886

833887
GlueHead = GlueNewHead = false;
834888
if (!Blobs.empty()) {
835-
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
889+
auto batch = TBatch::FromBlobs(Offset, std::move(Blobs));
836890
Blobs.clear();
837891
batch.Pack();
838892
Y_ABORT_UNLESS(batch.Packed);

ydb/core/persqueue/blob.h

+52-20
Original file line numberDiff line numberDiff line change
@@ -121,38 +121,30 @@ struct TBatch {
121121
TVector<ui32> InternalPartsPos;
122122
NKikimrPQ::TBatchHeader Header;
123123
TBuffer PackedData;
124+
124125
TBatch()
125126
: Packed(false)
126127
{
127128
PackedData.Reserve(8_MB);
128129
}
129130

130-
TBatch(const ui64 offset, const ui16 partNo, const TVector<TClientBlob>& blobs)
131-
: Packed(false)
131+
TBatch(const ui64 offset, const ui16 partNo)
132+
: TBatch()
132133
{
133-
PackedData.Reserve(8_MB);
134134
Header.SetOffset(offset);
135135
Header.SetPartNo(partNo);
136136
Header.SetUnpackedSize(0);
137137
Header.SetCount(0);
138138
Header.SetInternalPartsCount(0);
139-
for (auto& b : blobs) {
140-
AddBlob(b);
141-
}
142139
}
143140

144-
TBatch(const ui64 offset, const ui16 partNo, const std::deque<TClientBlob>& blobs)
145-
: Packed(false)
146-
{
147-
PackedData.Reserve(8_MB);
148-
Header.SetOffset(offset);
149-
Header.SetPartNo(partNo);
150-
Header.SetUnpackedSize(0);
151-
Header.SetCount(0);
152-
Header.SetInternalPartsCount(0);
141+
static TBatch FromBlobs(const ui64 offset, std::deque<TClientBlob>&& blobs) {
142+
Y_ABORT_UNLESS(!blobs.empty());
143+
TBatch batch(offset, blobs.front().GetPartNo());
153144
for (auto& b : blobs) {
154-
AddBlob(b);
145+
batch.AddBlob(b);
155146
}
147+
return batch;
156148
}
157149

158150
void AddBlob(const TClientBlob &b) {
@@ -187,6 +179,9 @@ struct TBatch {
187179
ui16 GetInternalPartsCount() const {
188180
return Header.GetInternalPartsCount();
189181
}
182+
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183+
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184+
}
190185

191186
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
192187
: Packed(true)
@@ -198,9 +193,9 @@ struct TBatch {
198193
ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); }
199194
void Pack();
200195
void Unpack();
201-
void UnpackTo(TVector<TClientBlob> *result);
202-
void UnpackToType0(TVector<TClientBlob> *result);
203-
void UnpackToType1(TVector<TClientBlob> *result);
196+
void UnpackTo(TVector<TClientBlob> *result) const;
197+
void UnpackToType0(TVector<TClientBlob> *result) const;
198+
void UnpackToType1(TVector<TClientBlob> *result) const;
204199

205200
void SerializeTo(TString& res) const;
206201

@@ -232,14 +227,39 @@ class TBlobIterator {
232227
ui16 InternalPartsCount;
233228
};
234229

230+
class TPartitionedBlob;
231+
235232
//THead represents bathes, stored in head(at most 8 Mb)
236233
struct THead {
237-
std::deque<TBatch> Batches;
238234
//all batches except last must be packed
239235
// BlobsSize <= 512Kb
240236
// size of Blobs after packing must be <= BlobsSize
241237
//otherwise head will be compacted not in total, some blobs will still remain in head
242238
//PackedSize + BlobsSize must be <= 8Mb
239+
private:
240+
std::deque<TBatch> Batches;
241+
ui16 InternalPartsCount = 0;
242+
243+
friend class TPartitionedBlob;
244+
245+
class TBatchAccessor {
246+
TBatch& Batch;
247+
248+
public:
249+
explicit TBatchAccessor(TBatch& batch)
250+
: Batch(batch)
251+
{}
252+
253+
void Pack() {
254+
Batch.Pack();
255+
}
256+
257+
void Unpack() {
258+
Batch.Unpack();
259+
}
260+
};
261+
262+
public:
243263
ui64 Offset;
244264
ui16 PartNo;
245265
ui32 PackedSize;
@@ -261,6 +281,18 @@ struct THead {
261281
//return Max<ui32> if not such pos in head
262282
//returns batch with such position
263283
ui32 FindPos(const ui64 offset, const ui16 partNo) const;
284+
285+
void AddBatch(const TBatch& batch);
286+
void ClearBatches();
287+
const std::deque<TBatch>& GetBatches() const;
288+
const TBatch& GetBatch(ui32 idx) const;
289+
const TBatch& GetLastBatch() const;
290+
TBatchAccessor MutableBatch(ui32 idx);
291+
TBatchAccessor MutableLastBatch();
292+
TBatch ExtractFirstBatch();
293+
void AddBlob(const TClientBlob& blob);
294+
295+
friend IOutputStream& operator <<(IOutputStream& out, const THead& value);
264296
};
265297

266298
IOutputStream& operator <<(IOutputStream& out, const THead& value);

ydb/core/persqueue/partition_init.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
634634
Y_ABORT_UNLESS(size == read.GetValue().size());
635635

636636
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
637-
head.Batches.emplace_back(it.GetBatch());
637+
head.AddBatch(it.GetBatch());
638638
}
639639
head.PackedSize += size;
640640

ydb/core/persqueue/partition_read.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -626,13 +626,13 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
626626
Y_ABORT_UNLESS(pos != Max<ui32>());
627627
}
628628
ui32 lastBlobSize = 0;
629-
for (;pos < Head.Batches.size(); ++pos) {
629+
for (;pos < Head.GetBatches().size(); ++pos) {
630630

631631
TVector<TClientBlob> blobs;
632-
Head.Batches[pos].UnpackTo(&blobs);
632+
Head.GetBatch(pos).UnpackTo(&blobs);
633633
ui32 i = 0;
634-
ui64 offset = Head.Batches[pos].GetOffset();
635-
ui16 pno = Head.Batches[pos].GetPartNo();
634+
ui64 offset = Head.GetBatch(pos).GetOffset();
635+
ui16 pno = Head.GetBatch(pos).GetPartNo();
636636
for (; i < blobs.size(); ++i) {
637637

638638
ui64 curOffset = offset;

ydb/core/persqueue/partition_write.cpp

+23-24
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
405405
Head.PackedSize = 0;
406406
Head.Offset = NewHead.Offset;
407407
Head.PartNo = NewHead.PartNo; //no partNo at this point
408-
Head.Batches.clear();
408+
Head.ClearBatches();
409409
}
410410

411411
while (!CompactedKeys.empty()) {
@@ -428,9 +428,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
428428
} // head cleared, all data moved to body
429429

430430
//append Head with newHead
431-
while (!NewHead.Batches.empty()) {
432-
Head.Batches.push_back(NewHead.Batches.front());
433-
NewHead.Batches.pop_front();
431+
while (!NewHead.GetBatches().empty()) {
432+
Head.AddBatch(NewHead.ExtractFirstBatch());
434433
}
435434
Head.PackedSize += NewHead.PackedSize;
436435

@@ -1324,22 +1323,22 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
13241323
ctx);
13251324
ui32 countOfLastParts = 0;
13261325
for (auto& x : PartitionedBlob.GetClientBlobs()) {
1327-
if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) {
1328-
NewHead.Batches.emplace_back(curOffset, x.GetPartNo(), TVector<TClientBlob>());
1326+
if (NewHead.GetBatches().empty() || NewHead.GetLastBatch().Packed) {
1327+
NewHead.AddBatch(TBatch(curOffset, x.GetPartNo()));
13291328
NewHead.PackedSize += GetMaxHeaderSize(); //upper bound for packed size
13301329
}
13311330
if (x.IsLastPart()) {
13321331
++countOfLastParts;
13331332
}
1334-
Y_ABORT_UNLESS(!NewHead.Batches.back().Packed);
1335-
NewHead.Batches.back().AddBlob(x);
1333+
Y_ABORT_UNLESS(!NewHead.GetLastBatch().Packed);
1334+
NewHead.AddBlob(x);
13361335
NewHead.PackedSize += x.GetBlobSize();
1337-
if (NewHead.Batches.back().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
1338-
NewHead.Batches.back().Pack();
1339-
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
1336+
if (NewHead.GetLastBatch().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
1337+
NewHead.MutableLastBatch().Pack();
1338+
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob
13401339

13411340
NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
1342-
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
1341+
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
13431342
}
13441343
}
13451344

@@ -1416,15 +1415,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
14161415
valueD.reserve(res.second);
14171416
ui32 pp = Head.FindPos(key.GetOffset(), key.GetPartNo());
14181417
if (pp < Max<ui32>() && key.GetOffset() < EndOffset) { //this batch trully contains this offset
1419-
Y_ABORT_UNLESS(pp < Head.Batches.size());
1420-
Y_ABORT_UNLESS(Head.Batches[pp].GetOffset() == key.GetOffset());
1421-
Y_ABORT_UNLESS(Head.Batches[pp].GetPartNo() == key.GetPartNo());
1422-
for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here
1423-
Y_ABORT_UNLESS(Head.Batches[pp].Packed);
1424-
Head.Batches[pp].SerializeTo(valueD);
1418+
Y_ABORT_UNLESS(pp < Head.GetBatches().size());
1419+
Y_ABORT_UNLESS(Head.GetBatch(pp).GetOffset() == key.GetOffset());
1420+
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
1421+
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
1422+
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
1423+
Head.GetBatch(pp).SerializeTo(valueD);
14251424
}
14261425
}
1427-
for (auto& b : NewHead.Batches) {
1426+
for (auto& b : NewHead.GetBatches()) {
14281427
Y_ABORT_UNLESS(b.Packed);
14291428
b.SerializeTo(valueD);
14301429
}
@@ -1701,7 +1700,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx)
17011700
NewHead.PartNo = 0;
17021701
NewHead.PackedSize = 0;
17031702

1704-
Y_ABORT_UNLESS(NewHead.Batches.empty());
1703+
Y_ABORT_UNLESS(NewHead.GetBatches().empty());
17051704

17061705
Parameters->OldPartsCleared = false;
17071706
Parameters->HeadCleared = (Head.PackedSize == 0);
@@ -1746,12 +1745,12 @@ void TPartition::EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, co
17461745

17471746
UpdateWriteBufferIsFullState(ctx.Now());
17481747

1749-
if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
1750-
NewHead.Batches.back().Pack();
1751-
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
1748+
if (!NewHead.GetBatches().empty() && !NewHead.GetLastBatch().Packed) {
1749+
NewHead.MutableLastBatch().Pack();
1750+
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob
17521751

17531752
NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
1754-
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
1753+
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
17551754
}
17561755

17571756
Y_ABORT_UNLESS((Parameters->HeadCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl

0 commit comments

Comments
 (0)