Skip to content

Finish implementing shred logic in VDisk #14534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ struct TEvBlobStorage {
EvHugeShredNotify,
EvHugeShredNotifyResult,
EvNotifyChunksDeleted,
EvListChunks,
EvListChunksResult,
EvHugeQueryForbiddenChunks,
EvHugeForbiddenChunks,

EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ void TNodeWarden::Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr
auto issueShredRequestToPDisk = [&] {
const ui64 cookie = ++LastShredCookie;
ShredInFlight.emplace(cookie, key);
pdisk.ShredCookies.insert(cookie);
pdisk.ShredCookies.emplace(cookie, generation);

const TActorId actorId = SelfId();
auto ev = std::make_unique<NPDisk::TEvShredPDisk>(generation);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace NKikimr::NStorage {

std::optional<ui64> ShredGenerationIssued;
std::variant<std::monostate, ui64, TString> ShredState; // not issued, finished with generation, aborted
THashSet<ui64> ShredCookies;
THashMap<ui64, ui64> ShredCookies;

TPDiskRecord(NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk record)
: Record(std::move(record))
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(jt != PDiskByPath.end() && jt->second.RunningPDiskId == it->first);
pending = std::move(jt->second.Pending);
PDiskByPath.erase(jt);
for (ui64 cookie : it->second.ShredCookies) {
ShredInFlight.erase(cookie);
auto& cookies = it->second.ShredCookies;
for (auto it = cookies.begin(); it != cookies.end(); ) {
const auto& [cookie, generation] = *it++;
ProcessShredStatus(cookie, generation, "pdisk has been restarted");
}
LocalPDisks.erase(it);
PDiskRestartInFlight.erase(pdiskId);
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
Become(&TThis::StateNormal);
}

void Ignore() {}

STRICT_STFUNC(StateNormal,
hFunc(NPDisk::TEvYardInit, Handle);
hFunc(NPDisk::TEvLog, Handle);
Expand All @@ -1010,6 +1012,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
hFunc(NPDisk::TEvWriteMetadata, Handle);

cFunc(EvBecomeError, HandleMoveToErrorState);

cFunc(TEvBlobStorage::EvMarkDirty, Ignore);
)

STRICT_STFUNC(StateError,
Expand All @@ -1030,6 +1034,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {

cFunc(TEvents::TSystem::Wakeup, ReportMetrics);
cFunc(EvBecomeNormal, HandleMoveToNormalState);

cFunc(TEvBlobStorage::EvMarkDirty, Ignore);
)
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class THugeModuleTestActor : public TActorBootstrapped<THugeModuleTestActor> {

STRICT_STFUNC(StateWorking,
HFunc(TEvHullLogHugeBlob, Handle);
cFunc(TEvBlobStorage::EvNotifyChunksDeleted, []{});
)

public:
Expand Down
17 changes: 11 additions & 6 deletions ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ namespace NKikimr {
};

struct TEvHullShredDefrag : TEventLocal<TEvHullShredDefrag, TEvBlobStorage::EvHullShredDefrag> {
std::vector<TChunkIdx> ChunksToShred;
TEvHullShredDefrag(std::vector<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
THashSet<TChunkIdx> ChunksToShred;
TEvHullShredDefrag(THashSet<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
};

struct TEvHullShredDefragResult : TEventLocal<TEvHullShredDefragResult, TEvBlobStorage::EvHullShredDefragResult> {
Expand All @@ -171,11 +171,16 @@ namespace NKikimr {
: Lsn(lsn)
, Chunks(std::move(chunks))
{}
};

TEvNotifyChunksDeleted(const TEvNotifyChunksDeleted& x)
: Lsn(x.Lsn)
, Chunks(x.Chunks)
{}
struct TEvListChunks : TEventLocal<TEvListChunks, TEvBlobStorage::EvListChunks> {
THashSet<TChunkIdx> ChunksOfInterest;
TEvListChunks(THashSet<TChunkIdx> chunksOfInterest) : ChunksOfInterest(std::move(chunksOfInterest)) {}
};

struct TEvListChunksResult : TEventLocal<TEvListChunksResult, TEvBlobStorage::EvListChunksResult> {
THashSet<TChunkIdx> ChunksHuge;
THashSet<TChunkIdx> ChunksSyncLog;
};

} // NKikimr
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ namespace NKikimr {
RunDefragIfAny(ctx);
}

void Handle(TEvNotifyChunksDeleted::TPtr ev, const TActorContext& /*ctx*/) {
for (TTask& task : WaitQueue) {
if (auto *ptr = std::get_if<TEvHullShredDefrag::TPtr>(&task.Request)) {
for (const TChunkIdx chunkId : ev->Get()->Chunks) {
(*ptr)->Get()->ChunksToShred.erase(chunkId);
}
}
}
}

void Die(const TActorContext& ctx) override {
ActiveActors.KillAndClear(ctx);
TActorBootstrapped::Die(ctx);
Expand Down Expand Up @@ -447,6 +457,7 @@ namespace NKikimr {
HFunc(TEvSublogLine, Handle)
HFunc(TEvDefragStartQuantum, Handle)
HFunc(TEvDefragQuantumResult, Handle)
HFunc(TEvNotifyChunksDeleted, Handle)
);

public:
Expand Down
35 changes: 30 additions & 5 deletions ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@ namespace NKikimr {
Yield();
}
ChunksToDefrag.emplace(findChunks.GetChunksToDefrag(DCtx->MaxChunksToDefrag));
} else {
Y_ABORT_UNLESS(*ChunksToDefrag || ChunksToDefrag->IsShred());
}
if (*ChunksToDefrag || ChunksToDefrag->IsShred()) {
const bool isShred = ChunksToDefrag->IsShred();
if (*ChunksToDefrag || ChunksToDefrag->IsShred) {
const bool isShred = ChunksToDefrag->IsShred;

TDefragChunks lockedChunks;

Expand All @@ -97,8 +95,27 @@ namespace NKikimr {
STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD11, DCtx->VCtx->VDiskLogPrefix << "locked chunks",
(ActorId, SelfActorId), (LockedChunks, lockedChunks));
} else {
auto forbiddenChunks = GetForbiddenChunks();

STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD14, DCtx->VCtx->VDiskLogPrefix
<< "commencing shredding", (ActorId, SelfActorId), (ChunksToShred, ChunksToDefrag->ChunksToShred));
<< "commencing shredding", (ActorId, SelfActorId), (ChunksToShred, ChunksToDefrag->ChunksToShred),
(ForbiddenChunks, forbiddenChunks));

// filter chunks to shred via forbidden chunks
auto& chunksToShred = ChunksToDefrag->ChunksToShred;
for (const TChunkIdx chunkIdx : std::exchange(chunksToShred, {})) {
if (forbiddenChunks.contains(chunkIdx)) {
chunksToShred.insert(chunkIdx);
}
}

// check if we have something remaining to process
if (chunksToShred.empty()) {
STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD15, DCtx->VCtx->VDiskLogPrefix << "nothing to do",
(ActorId, SelfActorId), (Stat, stat));
Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat)));
return;
}
}

TDefragQuantumFindRecords findRecords(std::move(*ChunksToDefrag), lockedChunks);
Expand Down Expand Up @@ -126,6 +143,7 @@ namespace NKikimr {
auto pred = [&](const auto& record) { return !set.contains(record.OldDiskPart.ChunkIdx); };
auto range = std::ranges::remove_if(records, pred);
records.erase(range.begin(), range.end());
findRecords.SetLockedChunks(std::move(set));
}

auto getSortedChunks = [&] {
Expand Down Expand Up @@ -198,6 +216,13 @@ namespace NKikimr {
return res->Get()->LockedChunks;
}

THashSet<TChunkIdx> GetForbiddenChunks() {
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvHugeQueryForbiddenChunks, 0, DCtx->HugeKeeperId,
SelfActorId, nullptr, 0));
auto res = WaitForSpecificEvent<TEvHugeForbiddenChunks>(&TDefragQuantum::ProcessUnexpectedEvent);
return res->Get()->ForbiddenChunks;
}

void Compact(THashSet<ui64> tablesToCompact, bool needsFreshCompaction) {
if (tablesToCompact) {
Send(DCtx->SkeletonId, TEvCompactVDisk::Create(EHullDbType::LogoBlobs, std::move(tablesToCompact)));
Expand Down
20 changes: 11 additions & 9 deletions ydb/core/blobstorage/vdisk/defrag/defrag_search.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ namespace NKikimr {
TDefragChunks Chunks;
ui32 FoundChunksToDefrag = 0;
ui64 EstimatedSlotsCount = 0;
bool IsShred = false;
THashSet<TChunkIdx> ChunksToShred;

static TChunksToDefrag Shred(const auto& chunks) {
TChunksToDefrag res;
res.ChunksToShred = {chunks.begin(), chunks.end()};
return res;
static TChunksToDefrag Shred(THashSet<TChunkIdx> chunksToShred) {
return {
.IsShred = true,
.ChunksToShred = std::move(chunksToShred),
};
}

void Output(IOutputStream &str) const {
Expand All @@ -40,10 +42,6 @@ namespace NKikimr {
explicit operator bool() const {
return !Chunks.empty();
}

bool IsShred() const {
return !ChunksToShred.empty();
}
};

struct TDefragRecord {
Expand Down Expand Up @@ -371,7 +369,7 @@ namespace NKikimr {

public:
TDefragQuantumFindRecords(TChunksToDefrag&& chunksToDefrag, const TDefragChunks& locked) {
if (chunksToDefrag.IsShred()) {
if (chunksToDefrag.IsShred) {
LockedChunks = Chunks = std::move(chunksToDefrag.ChunksToShred);
} else {
for (const auto& chunk : chunksToDefrag.Chunks) {
Expand All @@ -396,6 +394,10 @@ namespace NKikimr {
NeedsFreshCompaction = false;
}

void SetLockedChunks(THashSet<ui32> lockedChunks) {
LockedChunks = std::move(lockedChunks);
}

std::vector<TDefragRecord> GetRecordsToRewrite() { return std::move(RecsToRewrite); }
THashSet<ui64> GetTablesToCompact() { return std::move(TablesToCompact); }
bool GetNeedsFreshCompaction() const { return NeedsFreshCompaction; }
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
Y_ABORT_UNLESS(ev->Get()->ChunkIds.size() == 1);
ChunkId = ev->Get()->ChunkIds.front();
Lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point();
ctx.Send(HugeKeeperCtx->SkeletonId, new TEvNotifyChunksDeleted(Lsn, ev->Get()->ChunkIds));

LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: reserved:"
" chunkId# %" PRIu32 " Lsn# %" PRIu64, ChunkId, Lsn));
Expand Down Expand Up @@ -1054,6 +1055,16 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
}
}

void Handle(TEvListChunks::TPtr ev, const TActorContext& ctx) {
auto response = std::make_unique<TEvListChunksResult>();
State.Pers->Heap->ListChunks(ev->Get()->ChunksOfInterest, response->ChunksHuge);
ctx.Send(ev->Sender, response.release(), 0, ev->Cookie);
}

void HandleQueryForbiddenChunks(TAutoPtr<IEventHandle> ev, const TActorContext& ctx) {
ctx.Send(ev->Sender, new TEvHugeForbiddenChunks(State.Pers->Heap->GetForbiddenChunks()), 0, ev->Cookie);
}

void Handle(NPDisk::TEvCutLog::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG(ctx, BS_LOGCUTTER,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
Expand Down Expand Up @@ -1212,6 +1223,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeStat, Handle)
HFunc(TEvHugeShredNotify, Handle)
HFunc(TEvListChunks, Handle)
FFunc(TEvBlobStorage::EvHugeQueryForbiddenChunks, HandleQueryForbiddenChunks)
HFunc(NPDisk::TEvCutLog, Handle)
HFunc(NMon::TEvHttpInfo, Handle)
HFunc(TEvents::TEvPoisonPill, Handle)
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ namespace NKikimr {
TEvHugeShredNotify(std::vector<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
};

struct TEvHugeForbiddenChunks : TEventLocal<TEvHugeForbiddenChunks, TEvBlobStorage::EvHugeForbiddenChunks> {
THashSet<TChunkIdx> ForbiddenChunks;
TEvHugeForbiddenChunks(THashSet<TChunkIdx> forbiddenChunks) : ForbiddenChunks(std::move(forbiddenChunks)) {}
};

////////////////////////////////////////////////////////////////////////////
// THugeKeeperCtx
////////////////////////////////////////////////////////////////////////////
Expand Down
31 changes: 25 additions & 6 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,16 @@ namespace NKikimr {
}
}

void TChain::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
for (auto& map : {FreeSpace, LockedChunks}) {
for (const auto& [chunkIdx, freeSpace] : map) {
if (chunksOfInterest.contains(chunkIdx)) {
chunks.insert(chunkIdx);
}
}
}
}

////////////////////////////////////////////////////////////////////////////
// TAllChains
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -662,6 +672,12 @@ namespace NKikimr {
}
}

void TAllChains::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
for (TChain& chain : Chains) {
chain.ListChunks(chunksOfInterest, chunks);
}
}

////////////////////////////////////////////////////////////////////////////
// THeap
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -752,11 +768,7 @@ namespace NKikimr {
return Chains.GetStat();
}

std::vector<ui32> THeap::ShredNotify(const std::vector<ui32>& chunksToShred) {
std::vector<ui32> chunksToDrop;
std::set_intersection(chunksToShred.begin(), chunksToShred.end(), FreeChunks.begin(), FreeChunks.end(),
std::back_inserter(chunksToDrop));

void THeap::ShredNotify(const std::vector<ui32>& chunksToShred) {
Chains.ShredNotify(chunksToShred);

ForbiddenChunks.insert(chunksToShred.begin(), chunksToShred.end());
Expand All @@ -769,8 +781,15 @@ namespace NKikimr {
++it;
}
}
}

return chunksToDrop;
void THeap::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
for (const TChunkIdx chunkIdx : FreeChunks) {
if (chunksOfInterest.contains(chunkIdx)) {
chunks.insert(chunkIdx);
}
}
Chains.ListChunks(chunksOfInterest, chunks);
}

//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ namespace NKikimr {
void RenderHtmlForUsage(IOutputStream &str) const;
void GetOwnedChunks(TSet<TChunkIdx>& chunks) const;
void ShredNotify(const std::vector<ui32>& chunksToShred);
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);

static TChain Load(IInputStream *s, TString vdiskLogPrefix, ui32 appendBlockSize, ui32 blocksInChunk);

Expand Down Expand Up @@ -202,6 +203,7 @@ namespace NKikimr {

void FinishRecovery();
void ShredNotify(const std::vector<ui32>& chunksToShred);
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);

private:
void BuildChains();
Expand Down Expand Up @@ -279,7 +281,9 @@ namespace NKikimr {
// make chunk not available for allocations, it is used for heap defragmentation
bool LockChunkForAllocation(ui32 chunkId, ui32 slotSize);
THeapStat GetStat() const;
std::vector<ui32> ShredNotify(const std::vector<ui32>& chunksToShred);
void ShredNotify(const std::vector<ui32>& chunksToShred);
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);
THashSet<TChunkIdx> GetForbiddenChunks() const { return ForbiddenChunks; }

//////////////////////////////////////////////////////////////////////////////////////////
// RecoveryMode
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ namespace NKikimr {
THullCommitFinished::TypeToString(NotifyType), CommitMsg->CommitRecord.ToString().data(),
Metadata.RemovedHugeBlobs.ToString().data()));

// notify PDisk about dirty chunks (the ones from which huge slots are being freed right now)
THashSet<TChunkIdx> chunkIds;
for (const TDiskPart& p : Metadata.RemovedHugeBlobs) {
chunkIds.insert(p.ChunkIdx);
}
if (chunkIds) {
// TODO(alexvru): uncommit when PDisk stops breaking tests when this is enabled
// ctx.Send(Ctx->PDiskCtx->PDiskId, new NPDisk::TEvMarkDirty(Ctx->PDiskCtx->Dsk->Owner,
// Ctx->PDiskCtx->Dsk->OwnerRound, {chunkIds.begin(), chunkIds.end()}));
}

ctx.Send(Ctx->LoggerId, CommitMsg.release());
}

Expand Down
Loading
Loading