Skip to content

Commit a70a88e

Browse files
committed
Finish implementing shred logic in VDisk
1 parent b9ac376 commit a70a88e

24 files changed

+429
-223
lines changed

ydb/core/base/blobstorage.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,10 @@ struct TEvBlobStorage {
752752
EvHugeShredNotify,
753753
EvHugeShredNotifyResult,
754754
EvNotifyChunksDeleted,
755+
EvListChunks,
756+
EvListChunksResult,
757+
EvHugeQueryForbiddenChunks,
758+
EvHugeForbiddenChunks,
755759

756760
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
757761
EvLogResult,

ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ void TNodeWarden::Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr
671671
auto issueShredRequestToPDisk = [&] {
672672
const ui64 cookie = ++LastShredCookie;
673673
ShredInFlight.emplace(cookie, key);
674-
pdisk.ShredCookies.insert(cookie);
674+
pdisk.ShredCookies.emplace(cookie, generation);
675675

676676
const TActorId actorId = SelfId();
677677
auto ev = std::make_unique<NPDisk::TEvShredPDisk>(generation);

ydb/core/blobstorage/nodewarden/node_warden_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ namespace NKikimr::NStorage {
8484

8585
std::optional<ui64> ShredGenerationIssued;
8686
std::variant<std::monostate, ui64, TString> ShredState; // not issued, finished with generation, aborted
87-
THashSet<ui64> ShredCookies;
87+
THashMap<ui64, ui64> ShredCookies;
8888

8989
TPDiskRecord(NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk record)
9090
: Record(std::move(record))

ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,10 @@ namespace NKikimr::NStorage {
224224
Y_ABORT_UNLESS(jt != PDiskByPath.end() && jt->second.RunningPDiskId == it->first);
225225
pending = std::move(jt->second.Pending);
226226
PDiskByPath.erase(jt);
227-
for (ui64 cookie : it->second.ShredCookies) {
228-
ShredInFlight.erase(cookie);
227+
auto& cookies = it->second.ShredCookies;
228+
for (auto it = cookies.begin(); it != cookies.end(); ) {
229+
const auto& [cookie, generation] = *it++;
230+
ProcessShredStatus(cookie, generation, "pdisk has been restarted");
229231
}
230232
LocalPDisks.erase(it);
231233
PDiskRestartInFlight.erase(pdiskId);

ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
989989
Become(&TThis::StateNormal);
990990
}
991991

992+
void Ignore() {}
993+
992994
STRICT_STFUNC(StateNormal,
993995
hFunc(NPDisk::TEvYardInit, Handle);
994996
hFunc(NPDisk::TEvLog, Handle);
@@ -1010,6 +1012,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
10101012
hFunc(NPDisk::TEvWriteMetadata, Handle);
10111013

10121014
cFunc(EvBecomeError, HandleMoveToErrorState);
1015+
1016+
cFunc(TEvBlobStorage::EvMarkDirty, Ignore);
10131017
)
10141018

10151019
STRICT_STFUNC(StateError,
@@ -1030,6 +1034,8 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
10301034

10311035
cFunc(TEvents::TSystem::Wakeup, ReportMetrics);
10321036
cFunc(EvBecomeNormal, HandleMoveToNormalState);
1037+
1038+
cFunc(TEvBlobStorage::EvMarkDirty, Ignore);
10331039
)
10341040
};
10351041

ydb/core/blobstorage/ut_vdisk/lib/test_huge.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ class THugeModuleTestActor : public TActorBootstrapped<THugeModuleTestActor> {
293293

294294
STRICT_STFUNC(StateWorking,
295295
HFunc(TEvHullLogHugeBlob, Handle);
296+
cFunc(TEvBlobStorage::EvNotifyChunksDeleted, []{});
296297
)
297298

298299
public:

ydb/core/blobstorage/vdisk/common/vdisk_private_events.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ namespace NKikimr {
156156
};
157157

158158
struct TEvHullShredDefrag : TEventLocal<TEvHullShredDefrag, TEvBlobStorage::EvHullShredDefrag> {
159-
std::vector<TChunkIdx> ChunksToShred;
160-
TEvHullShredDefrag(std::vector<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
159+
THashSet<TChunkIdx> ChunksToShred;
160+
TEvHullShredDefrag(THashSet<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
161161
};
162162

163163
struct TEvHullShredDefragResult : TEventLocal<TEvHullShredDefragResult, TEvBlobStorage::EvHullShredDefragResult> {
@@ -171,11 +171,16 @@ namespace NKikimr {
171171
: Lsn(lsn)
172172
, Chunks(std::move(chunks))
173173
{}
174+
};
174175

175-
TEvNotifyChunksDeleted(const TEvNotifyChunksDeleted& x)
176-
: Lsn(x.Lsn)
177-
, Chunks(x.Chunks)
178-
{}
176+
struct TEvListChunks : TEventLocal<TEvListChunks, TEvBlobStorage::EvListChunks> {
177+
THashSet<TChunkIdx> ChunksOfInterest;
178+
TEvListChunks(THashSet<TChunkIdx> chunksOfInterest) : ChunksOfInterest(std::move(chunksOfInterest)) {}
179+
};
180+
181+
struct TEvListChunksResult : TEventLocal<TEvListChunksResult, TEvBlobStorage::EvListChunksResult> {
182+
THashSet<TChunkIdx> ChunksHuge;
183+
THashSet<TChunkIdx> ChunksSyncLog;
179184
};
180185

181186
} // NKikimr

ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,16 @@ namespace NKikimr {
331331
RunDefragIfAny(ctx);
332332
}
333333

334+
void Handle(TEvNotifyChunksDeleted::TPtr ev, const TActorContext& /*ctx*/) {
335+
for (TTask& task : WaitQueue) {
336+
if (auto *ptr = std::get_if<TEvHullShredDefrag::TPtr>(&task.Request)) {
337+
for (const TChunkIdx chunkId : ev->Get()->Chunks) {
338+
(*ptr)->Get()->ChunksToShred.erase(chunkId);
339+
}
340+
}
341+
}
342+
}
343+
334344
void Die(const TActorContext& ctx) override {
335345
ActiveActors.KillAndClear(ctx);
336346
TActorBootstrapped::Die(ctx);
@@ -447,6 +457,7 @@ namespace NKikimr {
447457
HFunc(TEvSublogLine, Handle)
448458
HFunc(TEvDefragStartQuantum, Handle)
449459
HFunc(TEvDefragQuantumResult, Handle)
460+
HFunc(TEvNotifyChunksDeleted, Handle)
450461
);
451462

452463
public:

ydb/core/blobstorage/vdisk/defrag/defrag_quantum.cpp

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,9 @@ namespace NKikimr {
7676
Yield();
7777
}
7878
ChunksToDefrag.emplace(findChunks.GetChunksToDefrag(DCtx->MaxChunksToDefrag));
79-
} else {
80-
Y_ABORT_UNLESS(*ChunksToDefrag || ChunksToDefrag->IsShred());
8179
}
82-
if (*ChunksToDefrag || ChunksToDefrag->IsShred()) {
83-
const bool isShred = ChunksToDefrag->IsShred();
80+
if (*ChunksToDefrag || ChunksToDefrag->IsShred) {
81+
const bool isShred = ChunksToDefrag->IsShred;
8482

8583
TDefragChunks lockedChunks;
8684

@@ -97,8 +95,27 @@ namespace NKikimr {
9795
STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD11, DCtx->VCtx->VDiskLogPrefix << "locked chunks",
9896
(ActorId, SelfActorId), (LockedChunks, lockedChunks));
9997
} else {
98+
auto forbiddenChunks = GetForbiddenChunks();
99+
100100
STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD14, DCtx->VCtx->VDiskLogPrefix
101-
<< "commencing shredding", (ActorId, SelfActorId), (ChunksToShred, ChunksToDefrag->ChunksToShred));
101+
<< "commencing shredding", (ActorId, SelfActorId), (ChunksToShred, ChunksToDefrag->ChunksToShred),
102+
(ForbiddenChunks, forbiddenChunks));
103+
104+
// filter chunks to shred via forbidden chunks
105+
auto& chunksToShred = ChunksToDefrag->ChunksToShred;
106+
for (const TChunkIdx chunkIdx : std::exchange(chunksToShred, {})) {
107+
if (forbiddenChunks.contains(chunkIdx)) {
108+
chunksToShred.insert(chunkIdx);
109+
}
110+
}
111+
112+
// check if we have something remaining to process
113+
if (chunksToShred.empty()) {
114+
STLOG(PRI_DEBUG, BS_VDISK_DEFRAG, BSVDD15, DCtx->VCtx->VDiskLogPrefix << "nothing to do",
115+
(ActorId, SelfActorId), (Stat, stat));
116+
Send(ParentActorId, new TEvDefragQuantumResult(std::move(stat)));
117+
return;
118+
}
102119
}
103120

104121
TDefragQuantumFindRecords findRecords(std::move(*ChunksToDefrag), lockedChunks);
@@ -126,6 +143,7 @@ namespace NKikimr {
126143
auto pred = [&](const auto& record) { return !set.contains(record.OldDiskPart.ChunkIdx); };
127144
auto range = std::ranges::remove_if(records, pred);
128145
records.erase(range.begin(), range.end());
146+
findRecords.SetLockedChunks(std::move(set));
129147
}
130148

131149
auto getSortedChunks = [&] {
@@ -198,6 +216,13 @@ namespace NKikimr {
198216
return res->Get()->LockedChunks;
199217
}
200218

219+
THashSet<TChunkIdx> GetForbiddenChunks() {
220+
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvHugeQueryForbiddenChunks, 0, DCtx->HugeKeeperId,
221+
SelfActorId, nullptr, 0));
222+
auto res = WaitForSpecificEvent<TEvHugeForbiddenChunks>(&TDefragQuantum::ProcessUnexpectedEvent);
223+
return res->Get()->ForbiddenChunks;
224+
}
225+
201226
void Compact(THashSet<ui64> tablesToCompact, bool needsFreshCompaction) {
202227
if (tablesToCompact) {
203228
Send(DCtx->SkeletonId, TEvCompactVDisk::Create(EHullDbType::LogoBlobs, std::move(tablesToCompact)));

ydb/core/blobstorage/vdisk/defrag/defrag_search.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ namespace NKikimr {
1818
TDefragChunks Chunks;
1919
ui32 FoundChunksToDefrag = 0;
2020
ui64 EstimatedSlotsCount = 0;
21+
bool IsShred = false;
2122
THashSet<TChunkIdx> ChunksToShred;
2223

23-
static TChunksToDefrag Shred(const auto& chunks) {
24-
TChunksToDefrag res;
25-
res.ChunksToShred = {chunks.begin(), chunks.end()};
26-
return res;
24+
static TChunksToDefrag Shred(THashSet<TChunkIdx> chunksToShred) {
25+
return {
26+
.IsShred = true,
27+
.ChunksToShred = std::move(chunksToShred),
28+
};
2729
}
2830

2931
void Output(IOutputStream &str) const {
@@ -40,10 +42,6 @@ namespace NKikimr {
4042
explicit operator bool() const {
4143
return !Chunks.empty();
4244
}
43-
44-
bool IsShred() const {
45-
return !ChunksToShred.empty();
46-
}
4745
};
4846

4947
struct TDefragRecord {
@@ -371,7 +369,7 @@ namespace NKikimr {
371369

372370
public:
373371
TDefragQuantumFindRecords(TChunksToDefrag&& chunksToDefrag, const TDefragChunks& locked) {
374-
if (chunksToDefrag.IsShred()) {
372+
if (chunksToDefrag.IsShred) {
375373
LockedChunks = Chunks = std::move(chunksToDefrag.ChunksToShred);
376374
} else {
377375
for (const auto& chunk : chunksToDefrag.Chunks) {
@@ -396,6 +394,10 @@ namespace NKikimr {
396394
NeedsFreshCompaction = false;
397395
}
398396

397+
void SetLockedChunks(THashSet<ui32> lockedChunks) {
398+
LockedChunks = std::move(lockedChunks);
399+
}
400+
399401
std::vector<TDefragRecord> GetRecordsToRewrite() { return std::move(RecsToRewrite); }
400402
THashSet<ui64> GetTablesToCompact() { return std::move(TablesToCompact); }
401403
bool GetNeedsFreshCompaction() const { return NeedsFreshCompaction; }

ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
311311
Y_ABORT_UNLESS(ev->Get()->ChunkIds.size() == 1);
312312
ChunkId = ev->Get()->ChunkIds.front();
313313
Lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point();
314+
ctx.Send(HugeKeeperCtx->SkeletonId, new TEvNotifyChunksDeleted(Lsn, ev->Get()->ChunkIds));
314315

315316
LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: reserved:"
316317
" chunkId# %" PRIu32 " Lsn# %" PRIu64, ChunkId, Lsn));
@@ -1054,6 +1055,16 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
10541055
}
10551056
}
10561057

1058+
void Handle(TEvListChunks::TPtr ev, const TActorContext& ctx) {
1059+
auto response = std::make_unique<TEvListChunksResult>();
1060+
State.Pers->Heap->ListChunks(ev->Get()->ChunksOfInterest, response->ChunksHuge);
1061+
ctx.Send(ev->Sender, response.release(), 0, ev->Cookie);
1062+
}
1063+
1064+
void HandleQueryForbiddenChunks(TAutoPtr<IEventHandle> ev, const TActorContext& ctx) {
1065+
ctx.Send(ev->Sender, new TEvHugeForbiddenChunks(State.Pers->Heap->GetForbiddenChunks()), 0, ev->Cookie);
1066+
}
1067+
10571068
void Handle(NPDisk::TEvCutLog::TPtr &ev, const TActorContext &ctx) {
10581069
LOG_DEBUG(ctx, BS_LOGCUTTER,
10591070
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
@@ -1212,6 +1223,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
12121223
HFunc(TEvHugeLockChunks, Handle)
12131224
HFunc(TEvHugeStat, Handle)
12141225
HFunc(TEvHugeShredNotify, Handle)
1226+
HFunc(TEvListChunks, Handle)
1227+
FFunc(TEvBlobStorage::EvHugeQueryForbiddenChunks, HandleQueryForbiddenChunks)
12151228
HFunc(NPDisk::TEvCutLog, Handle)
12161229
HFunc(NMon::TEvHttpInfo, Handle)
12171230
HFunc(TEvents::TEvPoisonPill, Handle)

ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ namespace NKikimr {
245245
TEvHugeShredNotify(std::vector<TChunkIdx> chunksToShred) : ChunksToShred(std::move(chunksToShred)) {}
246246
};
247247

248+
struct TEvHugeForbiddenChunks : TEventLocal<TEvHugeForbiddenChunks, TEvBlobStorage::EvHugeForbiddenChunks> {
249+
THashSet<TChunkIdx> ForbiddenChunks;
250+
TEvHugeForbiddenChunks(THashSet<TChunkIdx> forbiddenChunks) : ForbiddenChunks(std::move(forbiddenChunks)) {}
251+
};
252+
248253
////////////////////////////////////////////////////////////////////////////
249254
// THugeKeeperCtx
250255
////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,16 @@ namespace NKikimr {
388388
}
389389
}
390390

391+
void TChain::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
392+
for (auto& map : {FreeSpace, LockedChunks}) {
393+
for (const auto& [chunkIdx, freeSpace] : map) {
394+
if (chunksOfInterest.contains(chunkIdx)) {
395+
chunks.insert(chunkIdx);
396+
}
397+
}
398+
}
399+
}
400+
391401
////////////////////////////////////////////////////////////////////////////
392402
// TAllChains
393403
////////////////////////////////////////////////////////////////////////////
@@ -662,6 +672,12 @@ namespace NKikimr {
662672
}
663673
}
664674

675+
void TAllChains::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
676+
for (TChain& chain : Chains) {
677+
chain.ListChunks(chunksOfInterest, chunks);
678+
}
679+
}
680+
665681
////////////////////////////////////////////////////////////////////////////
666682
// THeap
667683
////////////////////////////////////////////////////////////////////////////
@@ -752,11 +768,7 @@ namespace NKikimr {
752768
return Chains.GetStat();
753769
}
754770

755-
std::vector<ui32> THeap::ShredNotify(const std::vector<ui32>& chunksToShred) {
756-
std::vector<ui32> chunksToDrop;
757-
std::set_intersection(chunksToShred.begin(), chunksToShred.end(), FreeChunks.begin(), FreeChunks.end(),
758-
std::back_inserter(chunksToDrop));
759-
771+
void THeap::ShredNotify(const std::vector<ui32>& chunksToShred) {
760772
Chains.ShredNotify(chunksToShred);
761773

762774
ForbiddenChunks.insert(chunksToShred.begin(), chunksToShred.end());
@@ -769,8 +781,15 @@ namespace NKikimr {
769781
++it;
770782
}
771783
}
784+
}
772785

773-
return chunksToDrop;
786+
void THeap::ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks) {
787+
for (const TChunkIdx chunkIdx : FreeChunks) {
788+
if (chunksOfInterest.contains(chunkIdx)) {
789+
chunks.insert(chunkIdx);
790+
}
791+
}
792+
Chains.ListChunks(chunksOfInterest, chunks);
774793
}
775794

776795
//////////////////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ namespace NKikimr {
150150
void RenderHtmlForUsage(IOutputStream &str) const;
151151
void GetOwnedChunks(TSet<TChunkIdx>& chunks) const;
152152
void ShredNotify(const std::vector<ui32>& chunksToShred);
153+
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);
153154

154155
static TChain Load(IInputStream *s, TString vdiskLogPrefix, ui32 appendBlockSize, ui32 blocksInChunk);
155156

@@ -202,6 +203,7 @@ namespace NKikimr {
202203

203204
void FinishRecovery();
204205
void ShredNotify(const std::vector<ui32>& chunksToShred);
206+
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);
205207

206208
private:
207209
void BuildChains();
@@ -279,7 +281,9 @@ namespace NKikimr {
279281
// make chunk not available for allocations, it is used for heap defragmentation
280282
bool LockChunkForAllocation(ui32 chunkId, ui32 slotSize);
281283
THeapStat GetStat() const;
282-
std::vector<ui32> ShredNotify(const std::vector<ui32>& chunksToShred);
284+
void ShredNotify(const std::vector<ui32>& chunksToShred);
285+
void ListChunks(const THashSet<TChunkIdx>& chunksOfInterest, THashSet<TChunkIdx>& chunks);
286+
THashSet<TChunkIdx> GetForbiddenChunks() const { return ForbiddenChunks; }
283287

284288
//////////////////////////////////////////////////////////////////////////////////////////
285289
// RecoveryMode

ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,16 @@ namespace NKikimr {
128128
THullCommitFinished::TypeToString(NotifyType), CommitMsg->CommitRecord.ToString().data(),
129129
Metadata.RemovedHugeBlobs.ToString().data()));
130130

131+
// notify PDisk about dirty chunks (the ones from which huge slots are being freed right now)
132+
THashSet<TChunkIdx> chunkIds;
133+
for (const TDiskPart& p : Metadata.RemovedHugeBlobs) {
134+
chunkIds.insert(p.ChunkIdx);
135+
}
136+
if (chunkIds) {
137+
ctx.Send(Ctx->PDiskCtx->PDiskId, new NPDisk::TEvMarkDirty(Ctx->PDiskCtx->Dsk->Owner,
138+
Ctx->PDiskCtx->Dsk->OwnerRound, {chunkIds.begin(), chunkIds.end()}));
139+
}
140+
131141
ctx.Send(Ctx->LoggerId, CommitMsg.release());
132142
}
133143

0 commit comments

Comments
 (0)