@@ -48,12 +48,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) {
4848 IsChanged = true ;
4949}
5050
51- void TBlobState::MarkBlobReadyToPut (ui8 blobIdx) {
52- Y_ABORT_UNLESS (WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present);
53- BlobIdx = blobIdx;
54- IsChanged = true ;
55- }
56-
5751bool TBlobState::Restore (const TBlobStorageGroupInfo &info) {
5852 const TIntervalVec<i32 > fullBlobInterval (0 , Id.BlobSize ());
5953 const TIntervalSet<i32 > here = Whole.Here ();
@@ -227,7 +221,7 @@ TString TBlobState::ToString() const {
227221 for (ui32 i = 0 ; i < Disks.size (); ++i) {
228222 str << Endl << " Disks[" << i << " ]# " << Disks[i].ToString () << Endl;
229223 }
230- str << " BlobIdx# " << (ui32) BlobIdx << Endl;
224+ str << " BlobIdx# " << BlobIdx << Endl;
231225 str << " }" ;
232226 return str.Str ();
233227}
@@ -304,7 +298,7 @@ void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui3
304298}
305299
306300void TGroupDiskRequests::AddPut (ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
307- TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
301+ TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx) {
308302 PutsPending.emplace_back (diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx);
309303}
310304
@@ -340,20 +334,6 @@ void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& part
340334 (*this )[id].AddPartToPut (partIdx, std::move (partData));
341335}
342336
343- void TBlackboard::MarkBlobReadyToPut (const TLogoBlobID &id, ui8 blobIdx) {
344- Y_ABORT_UNLESS (bool (id));
345- Y_ABORT_UNLESS (id.PartId () == 0 );
346- Y_ABORT_UNLESS (id.BlobSize () != 0 );
347- (*this )[id].MarkBlobReadyToPut (blobIdx);
348- }
349-
350- void TBlackboard::MoveBlobStateToDone (const TLogoBlobID &id) {
351- Y_ABORT_UNLESS (bool (id));
352- Y_ABORT_UNLESS (id.PartId () == 0 );
353- Y_ABORT_UNLESS (id.BlobSize () != 0 );
354- DoneBlobStates.insert (BlobStates.extract (id));
355- }
356-
357337void TBlackboard::AddPutOkResponse (const TLogoBlobID &id, ui32 orderNumber) {
358338 Y_ABORT_UNLESS (bool (id));
359339 Y_ABORT_UNLESS (id.PartId () != 0 );
@@ -390,8 +370,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
390370}
391371
392372EStrategyOutcome TBlackboard::RunStrategies (TLogContext &logCtx, const TStackVec<IStrategy*, 1 >& s,
393- TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
394- TString errorReason;
373+ TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
395374 for (auto it = BlobStates.begin (); it != BlobStates.end (); ) {
396375 auto & blob = it->second ;
397376 if (!std::exchange (blob.IsChanged , false )) {
@@ -401,23 +380,19 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
401380
402381 // recalculate blob outcome if it is not yet determined
403382 NKikimrProto::EReplyStatus status = NKikimrProto::OK;
383+ TString errorReason;
404384 for (IStrategy *strategy : s) {
405385 switch (auto res = strategy->Process (logCtx, blob, *Info, *this , GroupDiskRequests)) {
406386 case EStrategyOutcome::IN_PROGRESS:
407387 status = NKikimrProto::UNKNOWN;
408388 break ;
409389
410390 case EStrategyOutcome::ERROR:
411- if (IsAllRequestsTogether ) {
391+ if (!finished ) {
412392 return res;
413393 }
414- if (errorReason) {
415- errorReason += " && " ;
416- errorReason += res.ErrorReason ;
417- } else {
418- errorReason = res.ErrorReason ;
419- }
420394 status = NKikimrProto::ERROR;
395+ errorReason = std::move (res.ErrorReason );
421396 break ;
422397
423398 case EStrategyOutcome::DONE:
@@ -431,26 +406,25 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
431406 status = NKikimrProto::UNKNOWN;
432407 }
433408 if (status != NKikimrProto::UNKNOWN) {
409+ if (finished) { // we are operating on independent blobs
410+ finished->push_back (TFinishedBlob{
411+ blob.BlobIdx ,
412+ status,
413+ std::move (errorReason),
414+ });
415+ }
434416 const auto [doneIt, inserted, node] = DoneBlobStates.insert (BlobStates.extract (it++));
435417 Y_ABORT_UNLESS (inserted);
436- if (!IsAllRequestsTogether) {
437- blob.Status = status;
438- if (finished) {
439- finished->push_back (&*doneIt);
440- }
441- }
442418 } else {
443419 ++it;
444420 }
445421 }
446422
447- EStrategyOutcome outcome (BlobStates.empty () ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
448- outcome.ErrorReason = std::move (errorReason);
449- return outcome;
423+ return BlobStates.empty () ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS;
450424}
451425
452426EStrategyOutcome TBlackboard::RunStrategy (TLogContext &logCtx, const IStrategy& s,
453- TBatchedVec<TBlobStates::value_type* > *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
427+ TBatchedVec<TFinishedBlob > *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
454428 return RunStrategies (logCtx, {const_cast <IStrategy*>(&s)}, finished, expired);
455429}
456430
@@ -464,8 +438,7 @@ TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
464438 << " blobId# " << fullId
465439 << " BlackBoard# " << ToString ());
466440 }
467- TBlobState &state = it->second ;
468- return state;
441+ return it->second ;
469442}
470443
471444ssize_t TBlackboard::AddPartMap (const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex) {
@@ -512,8 +485,12 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T
512485 }
513486}
514487
515- void TBlackboard::RegisterBlobForPut (const TLogoBlobID& id) {
516- (*this )[id];
488+ void TBlackboard::RegisterBlobForPut (const TLogoBlobID& id, size_t blobIdx) {
489+ const auto [it, inserted] = BlobStates.try_emplace (id);
490+ Y_ABORT_UNLESS (inserted);
491+ TBlobState& state = it->second ;
492+ state.Init (id, *Info);
493+ state.BlobIdx = blobIdx;
517494}
518495
519496TBlobState& TBlackboard::operator [](const TLogoBlobID& id) {
@@ -559,9 +536,7 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
559536 const TVDiskID vdiskId = Info->GetVDiskId (orderNumber);
560537 for (auto & [id, state] : BlobStates) {
561538 if (const ui32 diskIdx = Info->GetIdxInSubgroup (vdiskId, id.Hash ()); diskIdx != Info->Type .BlobSubgroupSize ()) {
562- TBlobState::TDisk& disk = state.Disks [diskIdx];
563- for (ui32 partIdx = 0 ; partIdx < disk.DiskParts .size (); ++partIdx) {
564- TBlobState::TDiskPart& part = disk.DiskParts [partIdx];
539+ for (TBlobState::TDiskPart& part : state.Disks [diskIdx].DiskParts ) {
565540 if (part.Situation == TBlobState::ESituation::Present) {
566541 part.Situation = TBlobState::ESituation::Unknown;
567542 if (state.WholeSituation == TBlobState::ESituation::Present) {
0 commit comments