Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ namespace NKikimr {
// calculate keep status
bool allowKeepFlags = HullCtx->AllowKeepFlags;
NGc::TKeepStatus keep = brs->Keep(dbIt.GetCurKey(), dbMerger.GetMemRec(),
dbMerger.GetMemRecsMerged(), allowKeepFlags,
true /*allowGarbageCollection*/);
{subsMerger, dbMerger}, allowKeepFlags, true /*allowGarbageCollection*/);
if (keep.KeepIndex && !keep.KeepByBarrier) {
// we keep this record because of keep flags
candidates.AddCandidate(dbIt.GetCurKey().LogoBlobID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ namespace NKikimr {

bool Check(const TKeyLogoBlob &key,
const TMemRecLogoBlob &memRec,
ui32 recsMerged,
bool allowKeepFlags) const {
return BarriersEssence->Keep(key, memRec, recsMerged, allowKeepFlags, false /*allowGarbageCollection*/).KeepData;
return BarriersEssence->Keep(key, memRec, {}, allowKeepFlags, false /*allowGarbageCollection*/).KeepData;
}

TIntrusivePtr<THullCtx> HullCtx;
Expand Down Expand Up @@ -92,7 +91,7 @@ namespace NKikimr {
const auto& topology = *HullCtx->VCtx->Top; // topology we have
Y_ABORT_UNLESS(topology.BelongsToSubgroup(self, CurKey.Hash())); // check that blob belongs to subgroup

if (!Filter->Check(CurKey, CurIt.GetMemRec(), CurIt.GetMemRecsMerged(), HullCtx->AllowKeepFlags)) {
if (!Filter->Check(CurKey, CurIt.GetMemRec(), HullCtx->AllowKeepFlags)) {
// filter check returned false
return false;
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/blobstorage/vdisk/defrag/defrag_search.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ namespace NKikimr {
TDataMerger Merger;
TKeyLogoBlob Key;
TMemRecLogoBlob MemRec;
ui32 NumMemRecsMerged;

public:
TDefragScanner(THullDsSnap&& fullSnap)
Expand Down Expand Up @@ -99,13 +98,11 @@ namespace NKikimr {
void AddFromFresh(const TMemRecLogoBlob& memRec, const TRope* /*data*/, const TKeyLogoBlob& key, ui64 lsn) {
Update(memRec, nullptr, lsn);
MemRec.Merge(memRec, key);
++NumMemRecsMerged;
}

void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob& key, ui64 circaLsn) {
Update(memRec, outbound, circaLsn);
MemRec.Merge(memRec, key);
++NumMemRecsMerged;
}

static constexpr bool HaveToMergeData() { return false; }
Expand All @@ -114,13 +111,12 @@ namespace NKikimr {
void Start(const TKeyLogoBlob& key) {
Key = key;
MemRec = {};
NumMemRecsMerged = 0;
}

void Finish() {
if (!Merger.Empty()) {
Y_ABORT_UNLESS(!Merger.HasSmallBlobs());
NGc::TKeepStatus status = Barriers->Keep(Key, MemRec, NumMemRecsMerged, AllowKeepFlags, true /*allowGarbageCollection*/);
NGc::TKeepStatus status = Barriers->Keep(Key, MemRec, {}, AllowKeepFlags, true /*allowGarbageCollection*/);
const auto& hugeMerger = Merger.GetHugeBlobMerger();
const auto& local = MemRec.GetIngress().LocalParts(GType);
ui8 partIdx = local.FirstPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace NKikimr {

NGc::TKeepStatus TBarriersEssence::KeepLogoBlob(const TLogoBlobID &id,
const TIngress &ingress,
const ui32 recsMerged,
TKeepFlagStat keepFlagStat,
const bool allowKeepFlags,
bool allowGarbageCollection) const
{
Expand Down Expand Up @@ -88,17 +88,14 @@ namespace NKikimr {
// flags says us to keep the record?
const bool keepByFlags = ingress.KeepUnconditionally(IngressMode);

// is item is spread over multiple ssts?
const bool itemIsSpreadOverMultipleSsts = recsMerged > 1;

// check if we have to keep data associated with this blob
const bool keepData = (keepBySoftBarrier || keepByFlags) && keepByHardBarrier;

// check if we have to keep index data; when item is stored in multiple SSTables, we can't just drop one
// index and keep others as this index may contain Keep flag vital for blob consistency -- in case when
// we drop such record, we will lose keep flag and item will be occasionally on next compaction; anyway,
// if hard barrier tells us to drop this item, we drop it unconditinally
const bool spreadFactor = allowKeepFlags && itemIsSpreadOverMultipleSsts;
const bool spreadFactor = allowKeepFlags && keepFlagStat.Needed;
const bool keepIndex = (keepData || spreadFactor) && keepByHardBarrier;

return NGc::TKeepStatus(keepIndex, keepData, keepBySoftBarrier && keepByHardBarrier);
Expand Down
29 changes: 22 additions & 7 deletions ydb/core/blobstorage/vdisk/hulldb/barriers/barriers_essence.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@ namespace NKikimr {

namespace NGcOpt {

struct TKeepFlagStat {
bool Needed = false;

TKeepFlagStat() = default;

template<typename TKey, typename TMemRec>
TKeepFlagStat(const TRecordMergerBase<TKey, TMemRec>& subs, const TRecordMergerBase<TKey, TMemRec>& whole)
: Needed(subs.GetNumDoNotKeepFlags() == whole.GetNumDoNotKeepFlags() && // DoNotKeep flag only in this record
subs.GetNumKeepFlags() < whole.GetNumKeepFlags()) // and Keep flag somewhere else
{
// Needed is set to true when we are going to compact this record, but this is the only metadata record that
// contains DoNotKeep flag for the blob; in this case we have to keep the record without any data to prevent
// DoNotKeep from vanishing; this flag is used only when the blob is deletable (i.e. has no flags at all, or
// has both Keep and DoNotKeep, and also it is beyond the soft barrier)
}
};

//////////////////////////////////////////////////////////////////////////////////////////
// TBarriersEssence
//////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -34,17 +51,15 @@ namespace NGcOpt {

NGc::TKeepStatus Keep(const TKeyLogoBlob &key,
const TMemRecLogoBlob &memRec,
ui32 recsMerged,
TKeepFlagStat keepFlagStat,
bool allowKeepFlags,
bool allowGarbageCollection) const {
const TIngress ingress = memRec.GetIngress();
Y_DEBUG_ABORT_UNLESS(recsMerged >= 1);
return KeepLogoBlob(key.LogoBlobID(), ingress, recsMerged, allowKeepFlags, allowGarbageCollection);
return KeepLogoBlob(key.LogoBlobID(), memRec.GetIngress(), keepFlagStat, allowKeepFlags, allowGarbageCollection);
}

NGc::TKeepStatus Keep(const TKeyBlock& /*key*/,
const TMemRecBlock& /*memRec*/,
ui32 /*recsMerged*/,
TKeepFlagStat /*keepFlagStat*/,
bool /*allowKeepFlags*/,
bool /*allowGarbageCollection*/) const {
// NOTE: We never delete block records, we only merge them. Merge rules are
Expand All @@ -55,7 +70,7 @@ namespace NGcOpt {

NGc::TKeepStatus Keep(const TKeyBarrier& key,
const TMemRecBarrier& /*memRec*/,
ui32 /*recsMerged*/,
TKeepFlagStat /*keepFlagStat*/,
bool /*allowKeepFlags*/,
bool /*allowGarbageCollection*/) const {
return KeepBarrier(key);
Expand All @@ -77,7 +92,7 @@ namespace NGcOpt {
NGc::TKeepStatus KeepBarrier(const TKeyBarrier &key) const;
NGc::TKeepStatus KeepLogoBlob(const TLogoBlobID &id,
const TIngress &ingress,
const ui32 recsMerged,
TKeepFlagStat keepFlagStat,
const bool allowKeepFlags,
bool allowGarbageCollection) const;
};
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/blobstorage/vdisk/hulldb/blobstorage_hullgcmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ namespace NKikimr {
Y_UNUSED(subsMerger);
bool allowKeepFlags = HullCtx->AllowKeepFlags;
NGc::TKeepStatus keep = barriersEssence->Keep(dbIt.GetCurKey(), dbMerger.GetMemRec(),
dbMerger.GetMemRecsMerged(), allowKeepFlags,
AllowGarbageCollection);
{subsMerger, dbMerger}, allowKeepFlags, AllowGarbageCollection);
Stat.Update(dbIt.GetCurKey(), keep);
if (keep.KeepIndex) {
IndexKeepMap.Set(Stat.ItemsTotal - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ namespace NKikimr {
bool allowKeepFlags = HullCtx->AllowKeepFlags;
NGc::TKeepStatus keep = BarriersEssence->Keep(dbIt.GetCurKey(),
dbMerger.GetMemRec(),
dbMerger.GetMemRecsMerged(),
{subsMerger, dbMerger},
allowKeepFlags,
AllowGarbageCollection);
if (keep.KeepIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,40 @@ namespace NKikimr {
return MergeData;
}

ui32 GetMemRecsMerged() const {
return MemRecsMerged;
}
ui32 GetNumKeepFlags() const { return NumKeepFlags; }
ui32 GetNumDoNotKeepFlags() const { return NumDoNotKeepFlags >> 1; }

protected:
const TBlobStorageGroupType GType;
TMemRec MemRec;
ui32 MemRecsMerged; // number of items that took part in a merge for the current key
ui32 MemRecsMerged = 0; // number of items that took part in a merge for the current key
ui32 NumKeepFlags = 0;
ui32 NumDoNotKeepFlags = 0;
bool Finished;
const bool MergeData;

TRecordMergerBase(const TBlobStorageGroupType &gtype, bool mergeData)
: GType(gtype)
, MemRec()
, MemRecsMerged(0)
, Finished(false)
, MergeData(mergeData)
{}

void Clear() {
MemRecsMerged = 0;
NumKeepFlags = 0;
NumDoNotKeepFlags = 0;
Finished = false;
}

void AddBasic(const TMemRec &memRec, const TKey &key) {
if constexpr (std::is_same_v<TMemRec, TMemRecLogoBlob>) {
const int mode = memRec.GetIngress().GetCollectMode(TIngress::IngressMode(GType));
static_assert(CollectModeKeep == 1);
static_assert(CollectModeDoNotKeep == 2);
NumKeepFlags += mode & CollectModeKeep;
NumDoNotKeepFlags += mode & CollectModeDoNotKeep;
}
if (MemRecsMerged == 0) {
MemRec = memRec;
MemRec.SetNoBlob();
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idxsnap_it.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ namespace NKikimr {
const TMemRec &GetMemRec() const {
return Merger.GetMemRec();
}

ui32 GetMemRecsMerged() const {
return Merger.GetMemRecsMerged();
}
};

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -155,10 +151,6 @@ namespace NKikimr {
const TMemRec &GetMemRec() const {
return Merger.GetMemRec();
}

ui32 GetMemRecsMerged() const {
return Merger.GetMemRecsMerged();
}
};
} // NKikimr

4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/query/query_extr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ namespace NKikimr {

template<typename TMerger>
bool IsBlobDeleted(const TLogoBlobID &id, const TMerger &merger) {
const auto &status = BarriersEssence->Keep(id, merger.GetMemRec(), merger.GetMemRecsMerged(),
QueryCtx->HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/);
const auto &status = BarriersEssence->Keep(id, merger.GetMemRec(), {}, QueryCtx->HullCtx->AllowKeepFlags,
true /*allowGarbageCollection*/);
return !status.KeepData;
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/query/query_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ namespace NKikimr {

template<typename TMerger>
void AddIndexOnly(const TLogoBlobID &logoBlobId, const TMerger &merger) {
const auto &status = BarriersEssence->Keep(logoBlobId, merger.GetMemRec(), merger.GetMemRecsMerged(),
QueryCtx->HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/);
const auto &status = BarriersEssence->Keep(logoBlobId, merger.GetMemRec(), {},
QueryCtx->HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/);
if (status.KeepData) {
const TIngress &ingress = merger.GetMemRec().GetIngress();
ui64 ingr = ingress.Raw();
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ namespace NKikimr {
const TIngress ingress = memRec.GetIngress();
const auto parts = ingress.PartsWeMustHaveLocally(&topology, ReplCtx->VCtx->ShortSelfVDisk,
StartKey) - ingress.LocalParts(topology.GType);
if (!parts.Empty() && barriers->Keep(StartKey, memRec, it.GetMemRecsMerged(), allowKeepFlags,
if (!parts.Empty() && barriers->Keep(StartKey, memRec, {}, allowKeepFlags,
true /*allowGarbageCollection*/).KeepData) {
++ReplInfo->ItemsTotal;
ReplInfo->WorkUnitsTotal += StartKey.BlobSize();
Expand Down Expand Up @@ -168,8 +168,7 @@ namespace NKikimr {
return false; // nothing to recover
}

const NGc::TKeepStatus status = barriers.Keep(key, it.GetMemRec(), it.GetMemRecsMerged(), allowKeepFlags,
true /*allowGarbageCollection*/);
const NGc::TKeepStatus status = barriers.Keep(key, it.GetMemRec(), {}, allowKeepFlags, true /*allowGarbageCollection*/);
if (!status.KeepData) {
return false; // no need to recover
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace NKikimr {

iter.PutToMerger(&indexMerger);
indexMerger.Finish();
auto status = essence->Keep(iter.GetCurKey(), indexMerger.GetMemRec(), indexMerger.GetMemRecsMerged(),
Snap->HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/);
auto status = essence->Keep(iter.GetCurKey(), indexMerger.GetMemRec(), {}, Snap->HullCtx->AllowKeepFlags,
true /*allowGarbageCollection*/);
indexMerger.Clear();

const TLogoBlobID& id = iter.GetCurKey().LogoBlobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ namespace NKikimr {
Merger.Finish();

// obtain keep status
NGc::TKeepStatus status = Essence->Keep(id, Merger.GetMemRec(), Merger.GetMemRecsMerged(), AllowKeepFlags,
true /*allowGarbageCollection*/);
NGc::TKeepStatus status = Essence->Keep(id, Merger.GetMemRec(), {}, AllowKeepFlags, true /*allowGarbageCollection*/);

// clear merger for next operation
Merger.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ namespace NKikimr {
if (iter.Seek(id); iter.Valid() && iter.GetCurKey().LogoBlobID() == id) {
iter.PutToMerger(&merger);
merger.Finish();
keepData = barriers.Keep(id, merger.GetMemRec(), merger.GetMemRecsMerged(), snap.HullCtx->AllowKeepFlags,
keepData = barriers.Keep(id, merger.GetMemRec(), {}, snap.HullCtx->AllowKeepFlags,
true /*allowGarbageCollection*/).KeepData;
merger.Clear();
}
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ namespace NKikimr {

bool Check(const TKeyLogoBlob &key,
const TMemRecLogoBlob &memRec,
ui32 recsMerged,
bool allowKeepFlags,
bool allowGarbageCollection) const {
return TLogoBlobFilter::Check(key.LogoBlobID()) &&
BarriersEssence->Keep(key, memRec, recsMerged, allowKeepFlags, allowGarbageCollection).KeepData;
return TLogoBlobFilter::Check(key.LogoBlobID()) && BarriersEssence->Keep(key, memRec, {},
allowKeepFlags, allowGarbageCollection).KeepData;
}

TIntrusivePtr<THullCtx> HullCtx;
Expand Down Expand Up @@ -151,7 +150,7 @@ namespace NKikimr {
// copy data until we have some space
while (it.Valid() && (data->size() + NSyncLog::MaxRecFullSize <= data->capacity())) {
key = it.GetCurKey();
if (filter.Check(key, it.GetMemRec(), it.GetMemRecsMerged(), HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/))
if (filter.Check(key, it.GetMemRec(), HullCtx->AllowKeepFlags, true /*allowGarbageCollection*/))
Serialize(ctx, data, key, it.GetMemRec());
it.Next();
}
Expand Down