@@ -26,13 +26,15 @@ namespace NKikimr {
2626 TLogoBlobID LastKey;
2727 bool Eof;
2828 std::deque<TLogoBlobID> DroppedBlobs;
29+ TMilestoneQueue MilestoneQueue;
2930
3031 TEvReplPlanFinished (std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof,
31- std::deque<TLogoBlobID>&& droppedBlobs)
32+ std::deque<TLogoBlobID>&& droppedBlobs, TMilestoneQueue&& milestoneQueue )
3233 : RecoveryMachine(std::move(recoveryMachine))
3334 , LastKey(lastKey)
3435 , Eof(eof)
3536 , DroppedBlobs(std::move(droppedBlobs))
37+ , MilestoneQueue(std::move(milestoneQueue))
3638 {}
3739 };
3840
@@ -52,6 +54,7 @@ namespace NKikimr {
5254 std::deque<TLogoBlobID> DroppedBlobs;
5355 ui64 QuantumBytes = 0 ;
5456 bool AddingTasks = true ;
57+ TMilestoneQueue MilestoneQueue;
5558
5659 public:
5760 void Bootstrap (const TActorId& parentId) {
@@ -106,43 +109,83 @@ namespace NKikimr {
106109 } else {
107110 // scan through the index until we have enough blobs to recover or the time is out
108111 const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx ->Top ;
109- for (it.Seek (StartKey); it.Valid (); it.Next ()) {
110- StartKey = it.GetCurKey ().LogoBlobID ();
112+
113+ if (StartKey == TLogoBlobID ()) {
114+ it.SeekToFirst ();
115+ } else {
116+ it.Seek (StartKey);
117+ if (it.Valid () && it.GetCurKey () == StartKey) {
118+ it.Next ();
119+ }
120+ }
121+
122+ auto checkRestart = [&] {
111123 if (++counter % 1024 == 0 && GetCycleCountFast () >= plannedEndTime) {
112124 // we have event processing timer expired, restart processing later with new snapshot starting
113125 // with current key
114126 Send (ReplCtx->SkeletonId , new TEvTakeHullSnapshot (true ));
115- return ;
116- } else if (AddingTasks) {
127+ return true ;
128+ }
129+ return false ;
130+ };
131+
132+ if (AddingTasks) {
133+ for (; it.Valid (); it.Next ()) {
134+ if (checkRestart ()) {
135+ return ;
136+ }
137+
138+ StartKey = it.GetCurKey ().LogoBlobID ();
139+
117140 // we still have some space in recovery machine logic, so we can add new item
118141 ProcessItem (it, *barriers, allowKeepFlags);
119- } else {
120- // no space in recovery machine logic, but we still have to count remaining work
121- const TMemRecLogoBlob memRec = it.GetMemRec ();
122- const TIngress ingress = memRec.GetIngress ();
123- const auto parts = ingress.PartsWeMustHaveLocally (&topology, ReplCtx->VCtx ->ShortSelfVDisk ,
124- StartKey) - ingress.LocalParts (topology.GType );
125- if (!parts.Empty () && barriers->Keep (StartKey, memRec, {}, allowKeepFlags,
126- true /* allowGarbageCollection*/ ).KeepData ) {
127- ++ReplInfo->ItemsTotal ;
128- ReplInfo->WorkUnitsTotal += StartKey.BlobSize ();
129- }
142+ MilestoneQueue.PopIfNeeded (StartKey);
130143
131- if (!KeyToResumeNextTime) {
132- // this is first valid key that is not processed with ProcessItem, so we remember it to
133- // start next quantum with this exact key
134- KeyToResumeNextTime.emplace (StartKey);
144+ if (!AddingTasks) { // we have finished adding tasks after this key, remember it
145+ it.Next ();
146+ Y_ABORT_UNLESS (!KeyToResumeNextTime);
147+ if (it.Valid ()) {
148+ KeyToResumeNextTime.emplace (it.GetCurKey ().LogoBlobID ());
149+ }
150+ break ;
135151 }
136152 }
153+ eof = !it.Valid (); // we finish this quantum when there are no more tasks to generate
137154 }
138155
139- // we shall run next quantum only if we have KeyToResumeNextTime filled in
140- eof = !KeyToResumeNextTime;
156+ for (; it.Valid (); it.Next ()) {
157+ if (checkRestart ()) {
158+ return ;
159+ }
160+
161+ StartKey = it.GetCurKey ().LogoBlobID ();
162+
163+ // check the milestone queue, if we have requested blob
164+ if (MilestoneQueue.Match (StartKey, &ReplInfo->ItemsTotal , &ReplInfo->WorkUnitsTotal )) {
165+ break ;
166+ }
167+
168+ // no space in recovery machine logic, but we still have to count remaining work
169+ const TMemRecLogoBlob memRec = it.GetMemRec ();
170+ const TIngress ingress = memRec.GetIngress ();
171+ const auto parts = ingress.PartsWeMustHaveLocally (&topology, ReplCtx->VCtx ->ShortSelfVDisk ,
172+ StartKey) - ingress.LocalParts (topology.GType );
173+ if (!parts.Empty () && barriers->Keep (StartKey, memRec, {}, allowKeepFlags,
174+ true /* allowGarbageCollection*/ ).KeepData ) {
175+ ++ReplInfo->ItemsTotal ;
176+ ReplInfo->WorkUnitsTotal += StartKey.BlobSize ();
177+ MilestoneQueue.Push (StartKey, StartKey.BlobSize ());
178+ }
179+ }
180+
181+ if (!it.Valid ()) {
182+ MilestoneQueue.Finish ();
183+ }
141184 }
142185
143186 // the planning stage has finished, issue reply to the job actor
144187 Send (Recipient, new TEvReplPlanFinished (std::move (RecoveryMachine), KeyToResumeNextTime.value_or (TLogoBlobID ()),
145- eof, std::move (DroppedBlobs)));
188+ eof, std::move (DroppedBlobs), std::move (MilestoneQueue) ));
146189
147190 // finish processing for this actor
148191 PassAway ();
@@ -219,13 +262,15 @@ namespace NKikimr {
219262 const TLogoBlobID &startKey,
220263 TEvReplFinished::TInfoPtr replInfo,
221264 TBlobIdQueuePtr blobsToReplicatePtr,
222- TBlobIdQueuePtr unreplicatedBlobsPtr)
265+ TBlobIdQueuePtr unreplicatedBlobsPtr,
266+ TMilestoneQueue milestoneQueue)
223267 : ReplCtx(std::move(replCtx))
224268 , GInfo(std::move(ginfo))
225269 , StartKey(startKey)
226270 , ReplInfo(replInfo)
227271 , BlobsToReplicatePtr(std::move(blobsToReplicatePtr))
228272 , UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr))
273+ , MilestoneQueue(std::move(milestoneQueue))
229274 {}
230275 };
231276
@@ -265,6 +310,7 @@ namespace NKikimr {
265310 TBlobIdQueuePtr BlobsToReplicatePtr;
266311 TBlobIdQueuePtr UnreplicatedBlobsPtr;
267312 TUnreplicatedBlobRecords UnreplicatedBlobRecords;
313+ TMilestoneQueue MilestoneQueue;
268314 std::optional<std::pair<TVDiskID, TActorId>> Donor;
269315
270316 // parameters from planner
@@ -305,7 +351,7 @@ namespace NKikimr {
305351 for (const auto & proxy : DiskProxySet) {
306352 dropDonor = dropDonor && proxy && proxy->NoTransientErrors ();
307353 }
308- ReplInfo->Finish (LastKey, Eof, Donor && dropDonor, std::move (UnreplicatedBlobRecords));
354+ ReplInfo->Finish (LastKey, Eof, Donor && dropDonor, std::move (UnreplicatedBlobRecords), std::move (MilestoneQueue) );
309355
310356 TProxyStat stat;
311357 for (const TVDiskProxyPtr& p : DiskProxySet) {
@@ -326,7 +372,8 @@ namespace NKikimr {
326372 STLOG (PRI_DEBUG, BS_REPL, BSVR02, VDISKP (ReplCtx->VCtx ->VDiskLogPrefix , " THullReplJobActor::Bootstrap" ));
327373
328374 TimeAccount.SetState (ETimeState::PREPARE_PLAN);
329- auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr);
375+ auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr,
376+ UnreplicatedBlobsPtr, std::move (MilestoneQueue));
330377 auto aid = RunInBatchPool (ActorContext (), actor.release ());
331378 ActiveActors.Insert (aid, __FILE__, __LINE__, TActivationContext::AsActorContext (), NKikimrServices::BLOBSTORAGE);
332379 Become (&TThis::StatePreparePlan);
@@ -338,6 +385,7 @@ namespace NKikimr {
338385 RecoveryMachine = std::move (ev->Get ()->RecoveryMachine );
339386 LastKey = ev->Get ()->LastKey ;
340387 Eof = ev->Get ()->Eof ;
388+ MilestoneQueue = std::move (ev->Get ()->MilestoneQueue );
341389
342390 for (const TLogoBlobID& id : ev->Get ()->DroppedBlobs ) {
343391 DropUnreplicatedBlobRecord (id);
@@ -940,7 +988,8 @@ namespace NKikimr {
940988 TBlobIdQueuePtr&& blobsToReplicatePtr,
941989 TBlobIdQueuePtr&& unreplicatedBlobsPtr,
942990 const std::optional<std::pair<TVDiskID, TActorId>>& donor,
943- TUnreplicatedBlobRecords&& ubr)
991+ TUnreplicatedBlobRecords&& ubr,
992+ TMilestoneQueue&& milestoneQueue)
944993 : TActorBootstrapped<THullReplJobActor>()
945994 , ReplCtx(std::move(replCtx))
946995 , GInfo(ReplCtx->GInfo) // it is safe to take it here
@@ -956,6 +1005,7 @@ namespace NKikimr {
9561005 , BlobsToReplicatePtr(std::move(blobsToReplicatePtr))
9571006 , UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr))
9581007 , UnreplicatedBlobRecords(std::move(ubr))
1008+ , MilestoneQueue(std::move(milestoneQueue))
9591009 , Donor(donor)
9601010 {
9611011 if (Donor) {
@@ -987,10 +1037,12 @@ namespace NKikimr {
9871037 TBlobIdQueuePtr blobsToReplicatePtr,
9881038 TBlobIdQueuePtr unreplicatedBlobsPtr,
9891039 const std::optional<std::pair<TVDiskID, TActorId>>& donor,
990- TUnreplicatedBlobRecords&& ubr)
1040+ TUnreplicatedBlobRecords&& ubr,
1041+ TMilestoneQueue&& milestoneQueue)
9911042 {
9921043 return new THullReplJobActor (std::move (replCtx), parentId, startKey, std::move (queueActorMapPtr),
993- std::move (blobsToReplicatePtr), std::move (unreplicatedBlobsPtr), donor, std::move (ubr));
1044+ std::move (blobsToReplicatePtr), std::move (unreplicatedBlobsPtr), donor, std::move (ubr),
1045+ std::move (milestoneQueue));
9941046 }
9951047
9961048} // NKikimr
0 commit comments