Skip to content

Support skipping blob header in TDiskBlob #3145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class TVDiskReplProxyReaderActor : public TActorBootstrapped<TVDiskReplProxyRead

ReplCtx = std::make_shared<TReplCtx>(
VCtx,
nullptr,
nullptr, // PDiskCtx
nullptr, // HugeBlobCtx
nullptr,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class TBsCostModelBase {
const NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
const ui64 size = record.HasBuffer() ? record.GetBuffer().size() : ev.GetPayload(0).GetSize();

NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size, true);
if (handleType == NPriPut::Log) {
return WriteCost(size);
} else {
Expand All @@ -247,7 +247,7 @@ class TBsCostModelBase {

for (ui64 idx = 0; idx < record.ItemsSize(); ++idx) {
const ui64 size = ev.GetBufferBytes(idx);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size, true);
if (handleType == NPriPut::Log) {
cost += WriteCost(size);
} else {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace NKikimr {
HullCompMaxInFlightReads = 20;
HullCompReadBatchEfficiencyThreshold = 0.5; // don't issue reads if there are more gaps than the useful data
AnubisOsirisMaxInFly = 1000;
AddHeader = true;

RecoveryLogCutterFirstDuration = TDuration::Seconds(10);
RecoveryLogCutterRegularDuration = TDuration::Seconds(30);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ namespace NKikimr {
ui32 HullCompMaxInFlightReads;
double HullCompReadBatchEfficiencyThreshold;
ui64 AnubisOsirisMaxInFly;
bool AddHeader;

//////////////// LOG CUTTER SETTINGS ////////////////
TDuration RecoveryLogCutterFirstDuration;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/blobstorage/vdisk/common/vdisk_costmodel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ namespace NKikimr {
const NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
const ui64 bufSize = record.HasBuffer() ? record.GetBuffer().size() : ev.GetPayload(0).GetSize();

NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, bufSize);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, bufSize, true);
if (handleType == NPriPut::Log) {
*logPutInternalQueue = true;
return SmallWriteCost(bufSize);
Expand All @@ -197,7 +197,7 @@ namespace NKikimr {
ui64 cost = 0;
for (ui64 idx = 0; idx < record.ItemsSize(); ++idx) {
const ui64 size = ev.GetBufferBytes(idx);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, size, true);
if (handleType == NPriPut::Log) {
cost += SmallWriteCost(size);
} else {
Expand Down Expand Up @@ -264,7 +264,7 @@ namespace NKikimr {
cost += MovedPatchCostBySize(essence.MovedPatchBlobSize);
}
for (ui64 size : essence.PutBufferSizes) {
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, essence.HandleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, essence.HandleClass, size, true);
if (handleType == NPriPut::Log) {
cost += SmallWriteCost(size);
} else {
Expand Down
12 changes: 2 additions & 10 deletions ydb/core/blobstorage/vdisk/common/vdisk_handle_class.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace NKikimr {
namespace NPriPut {

EHandleType HandleType(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead) {
ui32 originalBufSizeWithoutOverhead, bool addHeader) {
// what size of huge blob it would be, if it huge
const ui64 hugeBlobSize = TDiskBlob::HugeBlobOverhead + originalBufSizeWithoutOverhead;
const ui64 hugeBlobSize = (addHeader ? TDiskBlob::HeaderSize : 0) + originalBufSizeWithoutOverhead;

switch (handleClass) {
case NKikimrBlobStorage::TabletLog:
Expand All @@ -25,13 +25,5 @@ namespace NKikimr {
}
}

bool IsHandleTypeLog(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead) {
const NPriPut::EHandleType handleType = NPriPut::HandleType(minREALHugeBlobSize, handleClass,
originalBufSizeWithoutOverhead);
const bool isHandleTypeLog = handleType == NPriPut::Log;
return isHandleTypeLog;
}

} // NPriPut
} // NKikimr
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/vdisk/common/vdisk_handle_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ namespace NKikimr {
};

EHandleType HandleType(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead);
ui32 originalBufSizeWithoutOverhead, bool addHeader);

bool IsHandleTypeLog(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead);
} // NPriPut

} // NKikimr
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace NKikimr {

// check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob
bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const {
return gtype.MaxPartSize(fullId) + TDiskBlob::HugeBlobOverhead >= MinREALHugeBlobInBytes;
return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= MinREALHugeBlobInBytes;
}

} // NKikimr
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ namespace NKikimr {
// this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize
const ui32 MinREALHugeBlobInBytes;
const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap;
const bool AddHeader;

// check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob
// check whether this NEW blob is huge one; userPartSize doesn't include any metadata stored along with blob
bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const;

THugeBlobCtx(ui32 minREALHugeBlobInBytes, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap)
THugeBlobCtx(ui32 minREALHugeBlobInBytes, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader)
: MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
, HugeSlotsMap(hugeSlotsMap)
, AddHeader(addHeader)
{}
};

Expand Down
27 changes: 17 additions & 10 deletions ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,25 @@ namespace NKikimr {
Y_ABORT_UNLESS(partId);

TRcBuf data = msg->Data.ToString();
Y_ABORT_UNLESS(data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId));
const char *header = data.data();

ui32 fullDataSize;
memcpy(&fullDataSize, header, sizeof(fullDataSize));
header += sizeof(fullDataSize);
Y_ABORT_UNLESS(fullDataSize == rec.LogoBlobId.BlobSize());

Y_ABORT_UNLESS(NMatrix::TVectorType::MakeOneHot(partId - 1, gtype.TotalPartCount()).Raw() == static_cast<ui8>(*header));
Y_ABORT_UNLESS(data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId) ||
data.size() == gtype.PartSize(rec.LogoBlobId));

ui32 trim = 0;
if (data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId)) {
const char *header = data.data();
ui32 fullDataSize;
memcpy(&fullDataSize, header, sizeof(fullDataSize));
header += sizeof(fullDataSize);
Y_ABORT_UNLESS(fullDataSize == rec.LogoBlobId.BlobSize());
Y_ABORT_UNLESS(NMatrix::TVectorType::MakeOneHot(partId - 1, gtype.TotalPartCount()).Raw() == static_cast<ui8>(*header));
trim += TDiskBlob::HeaderSize;
}

TRope rope(std::move(data));
rope.EraseFront(TDiskBlob::HeaderSize);
if (trim) {
rope.EraseFront(trim);
}
Y_ABORT_UNLESS(rope.size() == gtype.PartSize(rec.LogoBlobId));

auto writeEvent = std::make_unique<TEvBlobStorage::TEvVPut>(rec.LogoBlobId, std::move(rope),
SelfVDiskId, true, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace NKikimr {

return std::make_shared<THugeBlobCtx>(
repairedHuge->GetMinREALHugeBlobInBytes(),
repairedHuge->Heap->BuildHugeSlotsMap());
repairedHuge->Heap->BuildHugeSlotsMap(),
true);
}


Expand Down
87 changes: 54 additions & 33 deletions ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace NKikimr {

public:
static const size_t HeaderSize = sizeof(ui32) + sizeof(ui8);
static const size_t HugeBlobOverhead = HeaderSize;

TDiskBlob() = default;

Expand All @@ -44,29 +43,41 @@ namespace NKikimr {
, Parts(parts)
{
// ensure the blob format is correct
Y_ABORT_UNLESS(Rope->GetSize() >= HeaderSize);
Y_ABORT_UNLESS(parts.GetSize() <= MaxTotalPartCount);
//Y_ABORT_UNLESS(parts.GetSize() == gtype.TotalPartCount()); // TODO(alexvru): fit UTs

ui32 blobSize = 0;
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
blobSize += gtype.PartSize(TLogoBlobID(fullId, i + 1));
}

Y_ABORT_UNLESS(rope->GetSize() == blobSize || rope->GetSize() == blobSize + HeaderSize);

auto iter = Rope->Begin();
ui32 offset = 0;

// obtain full data size from the header
iter.ExtractPlainDataAndAdvance(&FullDataSize, sizeof(FullDataSize));
if (rope->GetSize() == blobSize + HeaderSize) {
// obtain full data size from the header
iter.ExtractPlainDataAndAdvance(&FullDataSize, sizeof(FullDataSize));

// then check the parts; we have `parts' argument to validate actual blob content
ui8 partsMask;
iter.ExtractPlainDataAndAdvance(&partsMask, sizeof(partsMask));
Y_ABORT_UNLESS(parts.Raw() == partsMask);
// then check the parts; we have `parts' argument to validate actual blob content
ui8 partsMask;
iter.ExtractPlainDataAndAdvance(&partsMask, sizeof(partsMask));
Y_ABORT_UNLESS(parts.Raw() == partsMask);

// advance offset
offset += HeaderSize;
} else {
FullDataSize = fullId.BlobSize();
}

// calculate part layout in the binary
ui32 offset = HeaderSize;
for (ui8 i = 0; i <= parts.GetSize(); ++i) {
PartOffs[i] = offset;
if (i != parts.GetSize()) {
offset += parts.Get(i) ? gtype.PartSize(TLogoBlobID(fullId, i + 1)) : 0;
}
}
Y_ABORT_UNLESS(GetSize() == Rope->GetSize(), "%" PRIu32 " != %zu", GetSize(), Rope->GetSize());
}

bool Empty() const {
Expand Down Expand Up @@ -112,8 +123,8 @@ namespace NKikimr {
return Parts;
}

ui32 GetSize() const {
return PartOffs[Parts.GetSize()];
ui32 GetBlobSize(bool addHeader) const {
return PartOffs[Parts.GetSize()] - PartOffs[0] + (addHeader ? HeaderSize : 0);
}

////////////////// Iterator via all parts ///////////////////////////////////////
Expand Down Expand Up @@ -203,19 +214,22 @@ namespace NKikimr {

public:
template<typename TPartIt>
static TRope CreateFromDistinctParts(TPartIt first, TPartIt last, NMatrix::TVectorType parts, ui64 fullDataSize, TRopeArena& arena) {
static TRope CreateFromDistinctParts(TPartIt first, TPartIt last, NMatrix::TVectorType parts, ui64 fullDataSize,
TRopeArena& arena, bool addHeader) {
// ensure that we have correct number of set parts
Y_ABORT_UNLESS(parts.CountBits() == std::distance(first, last));
Y_ABORT_UNLESS(first != last);

TRope rope;

// fill in header
char header[HeaderSize];
Y_ABORT_UNLESS(fullDataSize <= Max<ui32>());
*reinterpret_cast<ui32*>(header) = fullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, HeaderSize));
if (addHeader) {
// fill in header
char header[HeaderSize];
Y_ABORT_UNLESS(fullDataSize <= Max<ui32>());
*reinterpret_cast<ui32*>(header) = fullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, HeaderSize));
}

// then copy parts' contents to the rope
while (first != last) {
Expand All @@ -225,19 +239,22 @@ namespace NKikimr {
return rope;
}

static inline TRope Create(ui64 fullDataSize, ui8 partId, ui8 total, TRope&& data, TRopeArena& arena) {
static inline TRope Create(ui64 fullDataSize, ui8 partId, ui8 total, TRope&& data, TRopeArena& arena,
bool addHeader) {
Y_ABORT_UNLESS(partId > 0 && partId <= 8);
return CreateFromDistinctParts(&data, &data + 1, NMatrix::TVectorType::MakeOneHot(partId - 1, total),
fullDataSize, arena);
fullDataSize, arena, addHeader);
}

static inline TRope Create(ui64 fullDataSize, NMatrix::TVectorType parts, TRope&& data, TRopeArena& arena) {
return CreateFromDistinctParts(&data, &data + 1, parts, fullDataSize, arena);
static inline TRope Create(ui64 fullDataSize, NMatrix::TVectorType parts, TRope&& data, TRopeArena& arena,
bool addHeader) {
return CreateFromDistinctParts(&data, &data + 1, parts, fullDataSize, arena, addHeader);
}

// static function for calculating size of a blob being created ('Create' function creates blob of this size)
static inline ui32 CalculateBlobSize(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, NMatrix::TVectorType parts) {
ui32 res = HeaderSize;
static inline ui32 CalculateBlobSize(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, NMatrix::TVectorType parts,
bool addHeader) {
ui32 res = addHeader ? HeaderSize : 0;
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
res += gtype.PartSize(TLogoBlobID(fullId, i + 1));
}
Expand All @@ -256,7 +273,7 @@ namespace NKikimr {

if (Parts.Empty()) {
Parts = NMatrix::TVectorType(0, source.Parts.GetSize());
PartOffs.fill(HeaderSize);
PartOffs.fill(0); // we don't care about absolute offsets here
} else {
Y_ABORT_UNLESS(Parts.GetSize() == source.Parts.GetSize());
}
Expand All @@ -273,14 +290,18 @@ namespace NKikimr {
}
}

TRope CreateDiskBlob(TRopeArena& arena) const {
TRope CreateDiskBlob(TRopeArena& arena, bool addHeader) const {
Y_ABORT_UNLESS(!Empty());

char header[HeaderSize];
*reinterpret_cast<ui32*>(header) = FullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = Parts.Raw();
TRope rope;

if (addHeader) {
char header[HeaderSize];
*reinterpret_cast<ui32*>(header) = FullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = Parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, sizeof(header)));
}

TRope rope(arena.CreateRope(header, sizeof(header)));
for (auto it = begin(); it != end(); ++it) {
rope.Insert(rope.End(), it.GetPart());
}
Expand Down Expand Up @@ -315,8 +336,8 @@ namespace NKikimr {
return Blob.Empty();
}

TRope CreateDiskBlob(TRopeArena& arena) const {
return Blob.CreateDiskBlob(arena);
TRope CreateDiskBlob(TRopeArena& arena, bool addHeader) const {
return Blob.CreateDiskBlob(arena, addHeader);
}

const TDiskBlob& GetDiskBlob() const {
Expand Down
Loading