Skip to content

Commit 2751a13

Browse files
alexvrukruall
andauthored
Merge some fixes from main (#1672)
* Fix scrubbing and flapping unittest (#1640) * Fix leaking blobs via using patching (#1639) --------- Co-authored-by: kruall <kruall@ydb.tech>
1 parent 9b503c2 commit 2751a13

15 files changed

+255
-112
lines changed

ydb/core/blobstorage/backpressure/queue.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ class TBlobStorageQueue {
180180
return Queues.InFlight.size();
181181
}
182182

183+
ui64 GetInFlightCost() const {
184+
return InFlightCost;
185+
}
186+
183187
void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
184188
const TBlobStorageGroupType& type);
185189
void InvalidateCosts();

ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
365365
<< " msgId# " << msgId << " sequenceId# " << sequenceId
366366
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
367367
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
368-
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
368+
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
369+
<< " InFlightCost# " << Queue.GetInFlightCost()
370+
<< " InFlightCount# " << Queue.InFlightCount()
371+
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
372+
<< " BytesWaiting# " << Queue.GetBytesWaiting());
369373

370374
switch (ws) {
371375
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:

ydb/core/blobstorage/backpressure/queue_backpressure_server.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ namespace NKikimr {
196196
}
197197
}
198198

199-
TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
200-
Y_UNUSED(checkMsgId);
201-
Y_UNUSED(msgId);
199+
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
202200
Y_ABORT_UNLESS(Cost >= cost);
203201
Cost -= cost;
204202
--InFlight;

ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
293293
msg->SetId(ReaderTabletData->Id);
294294
msg->SetGeneration(ReaderTabletData->Generation);
295295
}
296-
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
297-
<< " vget# " << vget->ToString());
298296
}
299297

300298
for (auto& vget : gets) {
301299
if (vget) {
300+
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
301+
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
302+
<< " vget# " << vget->ToString());
302303
outVGets.push_back(std::move(vget));
303304
++RequestIndex;
304305
}

ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
6868
TStackVec<bool, TypicalDisksInSubring> ReceivedResponseFlags;
6969
TStackVec<bool, TypicalDisksInSubring> EmptyResponseFlags;
7070
TStackVec<bool, TypicalDisksInSubring> ErrorResponseFlags;
71+
TStackVec<bool, TypicalDisksInSubring> ForceStopFlags;
7172
TBlobStorageGroupInfo::TVDiskIds VDisks;
7273

7374
bool UseVPatch = false;
@@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
332333
}
333334

334335
void Handle(TEvBlobStorage::TEvVPatchResult::TPtr &ev) {
335-
ReceivedResults++;
336336
NKikimrBlobStorage::TEvVPatchResult &record = ev->Get()->Record;
337+
338+
Y_ABORT_UNLESS(record.HasCookie());
339+
ui8 subgroupIdx = record.GetCookie();
340+
if (ForceStopFlags[subgroupIdx]) {
341+
return; // ignore force stop response
342+
}
343+
ReceivedResults++;
344+
337345
PullOutStatusFlagsAndFressSpace(record);
338346
Y_ABORT_UNLESS(record.HasStatus());
339347
NKikimrProto::EReplyStatus status = record.GetStatus();
@@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
342350
errorReason = record.GetErrorReason();
343351
}
344352

345-
Y_ABORT_UNLESS(record.HasCookie());
346-
ui8 subgroupIdx = record.GetCookie();
347-
348353
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
349354
(Status, status),
350355
(SubgroupIdx, (ui32)subgroupIdx),
@@ -413,15 +418,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
413418
void SendStopDiffs() {
414419
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs");
415420
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events;
416-
for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) {
417-
if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) {
421+
for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) {
422+
if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) {
418423
std::unique_ptr<TEvBlobStorage::TEvVPatchDiff> ev = std::make_unique<TEvBlobStorage::TEvVPatchDiff>(
419-
OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx);
424+
OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx);
420425
ev->SetForceEnd();
426+
ForceStopFlags[subgroupIdx] = true;
421427
events.emplace_back(std::move(ev));
422428
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message",
423-
(VDiskIdxInSubgroup, vdiskIdx),
424-
(VDiskId, VDisks[vdiskIdx]));
429+
(VDiskIdxInSubgroup, subgroupIdx),
430+
(VDiskId, VDisks[subgroupIdx]));
425431
}
426432
}
427433
SendToQueues(events, false);
@@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
495501
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
496502
(VDiskIdxInSubgroup, idxInSubgroup),
497503
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
504+
(PartId, (ui64)partPlacement.PartId),
498505
(DiffsForPart, diffsForPart.size()),
499506
(ParityPlacements, parityPlacements.size()),
500507
(WaitedXorDiffs, waitedXorDiffs));
@@ -586,6 +593,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
586593
ReceivedResponseFlags.assign(VDisks.size(), false);
587594
ErrorResponseFlags.assign(VDisks.size(), false);
588595
EmptyResponseFlags.assign(VDisks.size(), false);
596+
ForceStopFlags.assign(VDisks.size(), false);
589597

590598
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;
591599

ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ struct TPDiskMockState::TImpl {
207207
}
208208
}
209209

210+
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
211+
const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize;
212+
return static_cast<bool>(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end});
213+
}
214+
210215
std::set<ui32> GetChunks() {
211216
std::set<ui32> res;
212217
for (auto& [ownerId, owner] : Owners) {
@@ -290,6 +295,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool
290295
Impl->SetCorruptedArea(chunkIdx, begin, end, enabled);
291296
}
292297

298+
bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
299+
return Impl->HasCorruptedArea(chunkIdx, begin, end);
300+
}
301+
293302
std::set<ui32> TPDiskMockState::GetChunks() {
294303
return Impl->GetChunks();
295304
}

ydb/core/blobstorage/pdisk/mock/pdisk_mock.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ namespace NKikimr {
2626
~TPDiskMockState();
2727

2828
void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled);
29+
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end);
2930
std::set<ui32> GetChunks();
3031
TMaybe<NPDisk::TOwnerRound> GetOwnerRound(const TVDiskID& vDiskId) const;
3132
ui32 GetChunkSize() const;

ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp

Lines changed: 109 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,64 +16,130 @@ void Test() {
1616

1717
TString data = TString::Uninitialized(8_MB);
1818
memset(data.Detach(), 'X', data.size());
19-
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);
20-
21-
{ // write data to group
22-
TActorId sender = runtime->AllocateEdgeActor(1);
23-
runtime->WrapInActorContext(sender, [&] {
24-
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
25-
});
26-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
27-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
28-
}
2919

30-
auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
31-
TActorId sender = runtime->AllocateEdgeActor(1);
32-
runtime->WrapInActorContext(sender, [&] {
33-
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
34-
NKikimrBlobStorage::EGetHandleClass::FastRead));
35-
});
36-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
37-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
38-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
39-
auto& r = res->Get()->Responses[0];
40-
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
41-
if (status == NKikimrProto::OK) {
42-
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
20+
for (ui32 step = 1; step < 100; ++step) {
21+
TLogoBlobID id(1, 1, step, 0, data.size(), 0);
22+
23+
{ // write data to group
24+
TActorId sender = runtime->AllocateEdgeActor(1);
25+
runtime->WrapInActorContext(sender, [&] {
26+
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
27+
});
28+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
29+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
4330
}
44-
};
4531

46-
checkReadable(NKikimrProto::OK);
32+
auto checkReadable = [&] {
33+
TActorId sender = runtime->AllocateEdgeActor(1);
34+
runtime->WrapInActorContext(sender, [&] {
35+
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
36+
NKikimrBlobStorage::EGetHandleClass::FastRead));
37+
});
38+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
39+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
40+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
41+
auto& r = res->Get()->Responses[0];
42+
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
43+
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
44+
45+
ui32 partsMask = 0;
46+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
47+
const TVDiskID& vdiskId = info->GetVDiskId(i);
48+
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
49+
const TActorId sender = runtime->AllocateEdgeActor(1);
50+
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
51+
NKikimrBlobStorage::EGetHandleClass::FastRead);
52+
ev->AddExtremeQuery(id, 0, 0);
53+
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
54+
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
55+
auto& record = reply->Get()->Record;
56+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
57+
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
58+
for (const auto& result : record.GetResult()) {
59+
if (result.GetStatus() == NKikimrProto::OK) {
60+
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
61+
UNIT_ASSERT(id.PartId());
62+
const ui32 partIdx = id.PartId() - 1;
63+
const ui32 mask = 1 << partIdx;
64+
UNIT_ASSERT(!(partsMask & mask));
65+
partsMask |= mask;
66+
} else {
67+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
68+
}
69+
}
70+
});
71+
}
72+
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
73+
};
4774

48-
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
49-
const TActorId vdiskActorId = info->GetActorId(i);
75+
checkReadable();
5076

51-
ui32 nodeId, pdiskId;
52-
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
53-
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
54-
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
77+
ui32 mask = 0;
5578

56-
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
57-
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
58-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
79+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
80+
const TActorId vdiskActorId = info->GetActorId(i);
5981

60-
for (auto& item : res->Get()->Layout) {
61-
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
62-
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
63-
const TDiskPart& part = item.Location;
64-
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
65-
break;
82+
ui32 nodeId, pdiskId;
83+
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
84+
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
85+
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
86+
87+
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
88+
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
89+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
90+
91+
for (auto& item : res->Get()->Layout) {
92+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
93+
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
94+
const TDiskPart& part = item.Location;
95+
mask |= 1 << i;
96+
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
97+
break;
98+
}
6699
}
100+
101+
checkReadable();
67102
}
68103

69-
checkReadable(NKikimrProto::OK);
70-
}
104+
env.Sim(TDuration::Seconds(60));
71105

72-
env.Sim(TDuration::Seconds(60));
106+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
107+
if (~mask >> i & 1) {
108+
continue;
109+
}
110+
111+
const TActorId vdiskActorId = info->GetActorId(i);
112+
113+
ui32 nodeId, pdiskId;
114+
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
115+
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
116+
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
117+
118+
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
119+
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
120+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
121+
122+
bool anyPartReadable = false;
123+
124+
for (auto& item : res->Get()->Layout) {
125+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
126+
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
127+
const TDiskPart& part = item.Location;
128+
anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size);
129+
if (anyPartReadable) {
130+
break;
131+
}
132+
}
133+
}
134+
135+
UNIT_ASSERT(anyPartReadable);
136+
}
137+
}
73138
}
74139

75140
Y_UNIT_TEST_SUITE(ScrubFast) {
76141
Y_UNIT_TEST(SingleBlob) {
77142
Test();
78143
}
79144
}
145+

ydb/core/blobstorage/vdisk/common/vdisk_events.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,12 @@ namespace NKikimr {
366366
: public TEventLocal<TEvVDiskRequestCompleted, TEvBlobStorage::EvVDiskRequestCompleted> {
367367
TVMsgContext Ctx;
368368
std::unique_ptr<IEventHandle> Event;
369+
bool DoNotResend;
369370

370-
TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event)
371+
TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event, bool doNotResend = false)
371372
: Ctx(ctx)
372373
, Event(std::move(event))
374+
, DoNotResend(doNotResend)
373375
{
374376
Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown);
375377
Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown);
@@ -468,6 +470,9 @@ namespace NKikimr {
468470
TActorIDPtr SkeletonFrontIDPtr;
469471
THPTimer ExecutionTimer;
470472

473+
protected:
474+
bool DoNotResendFromSkeletonFront = false;
475+
471476
public:
472477
TEvVResultBaseWithQoSPB() = default;
473478

@@ -526,7 +531,7 @@ namespace NKikimr {
526531
byteSize, this->ToString().data());
527532

528533
if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) {
529-
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev)));
534+
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront));
530535
} else {
531536
TActivationContext::Send(ev.release());
532537
}
@@ -2182,6 +2187,10 @@ namespace NKikimr {
21822187
Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare);
21832188
}
21842189

2190+
void SetForceEndResponse() {
2191+
DoNotResendFromSkeletonFront = true;
2192+
}
2193+
21852194
void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason,
21862195
const NKikimrBlobStorage::TEvVPatchDiff &request)
21872196
{

ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ namespace NKikimr {
9999

100100
// a map to fill upon receiving VGet result
101101
struct TPerBlobInfo {
102-
const TInstant Deadline;
103102
std::weak_ptr<TInFlightContext> Context;
104103
TEvRecoverBlobResult::TItem *Item; // item to update
105104
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
106105
};
107106
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
107+
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;
108108

109109
void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
110-
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
110+
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
111111
void SendPendingQueries();
112112
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
113113
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);

0 commit comments

Comments
 (0)