Skip to content

Commit 8784284

Browse files
committed
Add timeouts and fix MovedPatch
1 parent 096a641 commit 8784284

File tree

14 files changed

+351
-115
lines changed

14 files changed

+351
-115
lines changed

ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp

Lines changed: 87 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,33 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
2929
}
3030
};
3131

32-
enum EWakeUpTag : ui64{
32+
enum EWakeUpTag : ui64 {
3333
VPatchStartTag,
3434
VPatchDiffTag,
3535
MovedPatchTag,
3636
NeverTag,
3737
};
3838

39+
static TString ToString(ui64 wakeUp) {
40+
switch (wakeUp) {
41+
case VPatchStartTag: return "VPatchStartTag";
42+
case VPatchDiffTag: return "VPatchDiffTag";
43+
case MovedPatchTag: return "MovedPatchTag";
44+
case NeverTag: return "NeverTag";
45+
default: return "unknown@" + ToString(wakeUp);
46+
}
47+
}
48+
3949
static constexpr ui32 TypicalHandoffCount = 2;
4050
static constexpr ui32 TypicalPartPlacementCount = 1 + TypicalHandoffCount;
4151
static constexpr ui32 TypicalMaxPartsCount = TypicalPartPlacementCount * TypicalPartsInBlob;
4252

43-
static constexpr ui32 VPatchStartWaitingMultiplier = 1;
44-
static constexpr ui32 VPatchDiffWaitingMultiplier = 2;
53+
static constexpr ui32 VPatchStartWaitingMultiplier = 2;
54+
static constexpr ui32 VPatchDiffWaitingMultiplier = 6;
4555
static constexpr ui32 MovedPatchWaitingMultiplier = 4;
4656

57+
static constexpr ui32 DefaultNsForChangeStrategy = 30'000'000; // 30 ms
58+
4759
TString Buffer;
4860

4961
TGroupId OriginalGroupId;
@@ -58,21 +70,18 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
5870
float ApproximateFreeSpaceShare = 0;
5971

6072
TInstant StartTime;
73+
TInstant StageStart;
6174
TInstant Deadline;
6275

6376
NLWTrace::TOrbit Orbit;
6477
TString ErrorReason;
6578

66-
ui32 SendedGetRequests = 0;
67-
ui32 ReceivedGetResponses = 0;
68-
ui32 SendedPutRequests = 0;
69-
ui32 ReceivedPutResponses = 0;
70-
7179
TVector<ui32> OkVDisksWithParts;
7280

7381
ui32 SentStarts = 0;
7482
ui32 ReceivedFoundParts = 0;
7583
ui32 ErrorResponses = 0;
84+
ui32 SentVPatchDiff = 0;
7685
ui32 ReceivedResults = 0;
7786

7887
TStackVec<TPartPlacement, TypicalMaxPartsCount> FoundParts;
@@ -83,8 +92,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
8392
TStackVec<bool, TypicalDisksInSubring> SlowFlags;
8493
TBlobStorageGroupInfo::TVDiskIds VDisks;
8594

86-
TInstant VPatchDiffDeadline;
87-
8895
bool UseVPatch = false;
8996
bool IsGoodPatchedBlobId = false;
9097
bool IsAllowedErasure = false;
@@ -107,6 +114,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
107114
// PATCH_LOG
108115

109116
public:
117+
118+
void ScheduleWakeUp(TInstant startTime, EWakeUpTag tag) {
119+
TDuration duration = TActivationContext::Now() - startTime;
120+
Schedule(duration, new TEvents::TEvWakeup(tag));
121+
}
122+
123+
void ScheduleWakeUp(EWakeUpTag tag) {
124+
ScheduleWakeUp(StageStart, tag);
125+
}
126+
110127
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
111128
return NKikimrServices::TActivity::BS_PROXY_PATCH_ACTOR;
112129
}
@@ -223,7 +240,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
223240
ApplyDiffs();
224241

225242
std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline,
226-
NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault);
243+
NKikimrBlobStorage::UserData, TEvBlobStorage::TEvPut::TacticDefault);
227244
put->Orbit = std::move(Orbit);
228245
SendToProxy(std::move(put), OriginalId.Hash(), Span.GetTraceId());
229246
}
@@ -311,6 +328,12 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
311328
void Handle(TEvBlobStorage::TEvVPatchFoundParts::TPtr &ev) {
312329
ReceivedFoundParts++;
313330

331+
if (Info->Type.ErasureFamily() != TErasureType::ErasureMirror) {
332+
if (ReceivedFoundParts == SentStarts / 2 + SentStarts % 2) {
333+
ScheduleWakeUp(VPatchStartTag);
334+
}
335+
}
336+
314337
NKikimrBlobStorage::TEvVPatchFoundParts &record = ev->Get()->Record;
315338

316339
Y_ABORT_UNLESS(record.HasCookie());
@@ -344,6 +367,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
344367
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA26, "Received VPatchFoundParts",
345368
(Status, status),
346369
(SubgroupIdx, (ui32)subgroupIdx),
370+
(VDiskId, VDisks[subgroupIdx]),
347371
(ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedFoundParts << '/' << SentStarts)),
348372
(ErrorReason, errorReason));
349373

@@ -373,6 +397,13 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
373397
}
374398
ReceivedResults++;
375399

400+
401+
if (Info->Type.ErasureFamily() != TErasureType::ErasureMirror) {
402+
if (ReceivedResults == SentVPatchDiff / 2 + SentVPatchDiff % 2) {
403+
ScheduleWakeUp(VPatchDiffTag);
404+
}
405+
}
406+
376407
PullOutStatusFlagsAndFressSpace(record);
377408
Y_ABORT_UNLESS(record.HasStatus());
378409
NKikimrProto::EReplyStatus status = record.GetStatus();
@@ -384,6 +415,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
384415
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
385416
(Status, status),
386417
(SubgroupIdx, (ui32)subgroupIdx),
418+
(VDiskID, VDisks[subgroupIdx]),
387419
(ReceivedResults, static_cast<TString>(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())),
388420
(ErrorReason, errorReason));
389421

@@ -511,7 +543,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
511543

512544
ui32 waitedXorDiffs = (partPlacement.PartId > dataParts) ? dataPartCount : 0;
513545
auto ev = std::make_unique<TEvBlobStorage::TEvVPatchDiff>(originalPartBlobId, partchedPartBlobId,
514-
VDisks[idxInSubgroup], waitedXorDiffs, VPatchDiffDeadline, idxInSubgroup);
546+
VDisks[idxInSubgroup], waitedXorDiffs, Deadline, idxInSubgroup);
515547

516548
ui32 diffForPartIdx = 0;
517549
if (Info->Type.ErasureFamily() != TErasureType::ErasureMirror) {
@@ -531,6 +563,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
531563
}
532564
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
533565
(VDiskIdxInSubgroup, idxInSubgroup),
566+
(VDiskId, VDisks[idxInSubgroup]),
534567
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
535568
(PartId, (ui64)partPlacement.PartId),
536569
(DiffsForPart, diffsForPart.size()),
@@ -561,31 +594,24 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
561594
ui32 vdiskIdx = vdiskIdxForParts[partIdx];
562595
Y_VERIFY_S(vdiskIdx == partIdx || vdiskIdx >= dataParts, "vdiskIdx# " << vdiskIdx << " partIdx# " << partIdx);
563596
placements.push_back(TPartPlacement{static_cast<ui8>(vdiskIdx), static_cast<ui8>(partIdx + 1)});
597+
SentVPatchDiff++;
564598
}
565599
SendDiffs(placements);
566600
}
567601

568602
void StartMovedPatch() {
569603
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA09, "Start Moved strategy",
570604
(SentStarts, SentStarts));
605+
ScheduleWakeUp(StartTime, MovedPatchTag);
571606
Become(&TThis::MovedPatchState);
572607
TInstant movedPatchDeadline = Deadline;
573608
IsMovedPatch = true;
574609
ui32 subgroupIdx = 0;
610+
575611
if (OkVDisksWithParts) {
576612
ui32 okVDiskIdx = RandomNumber<ui32>(OkVDisksWithParts.size());
577613
subgroupIdx = OkVDisksWithParts[okVDiskIdx];
578614
} else {
579-
ui64 worstNs = 0;
580-
ui64 nextToWorstNs = 0;
581-
i32 worstSubGroubIdx = -1;
582-
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
583-
if (worstNs * 2 > nextToWorstNs) {
584-
SlowFlags[worstSubGroubIdx] = true;
585-
}
586-
Schedule(TDuration::MicroSeconds(nextToWorstNs * MovedPatchWaitingMultiplier / 1000), new TEvents::TEvWakeup(MovedPatchTag));
587-
movedPatchDeadline = TActivationContext::Now() + TDuration::MicroSeconds(nextToWorstNs * MovedPatchWaitingMultiplier * 0.9 / 1000);
588-
589615
if (HasSlowVDisk) {
590616
TStackVec<ui32, TypicalDisksInSubring> goodDisks;
591617
for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
@@ -628,7 +654,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
628654

629655
void StartFallback() {
630656
Mon->PatchesWithFallback->Inc();
631-
if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured) {
657+
if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured && !IsMovedPatch) {
632658
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback");
633659
StartMovedPatch();
634660
} else {
@@ -640,6 +666,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
640666
}
641667

642668
void StartVPatch() {
669+
StageStart = TActivationContext::Now();
643670
Become(&TThis::VPatchState);
644671
ReceivedResponseFlags.assign(VDisks.size(), false);
645672
ErrorResponseFlags.assign(VDisks.size(), false);
@@ -648,22 +675,10 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
648675
SlowFlags.assign(VDisks.size(), false);
649676

650677
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;
651-
652-
TInstant vpatchStartDeadline = Deadline;
653-
ui64 worstNs = 0;
654-
ui64 nextToWorstNs = 0;
655-
i32 worstSubGroubIdx = -1;
656-
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
657-
if (worstNs * 2 > nextToWorstNs) {
658-
SlowFlags[worstSubGroubIdx] = true;
659-
}
660-
Schedule(TDuration::MicroSeconds(nextToWorstNs * VPatchStartWaitingMultiplier / 1000), new TEvents::TEvWakeup(VPatchStartTag));
661-
vpatchStartDeadline = TActivationContext::Now() + TDuration::MicroSeconds(nextToWorstNs * (VPatchStartWaitingMultiplier + VPatchDiffWaitingMultiplier) * 0.9 / 1000);
662-
663678
for (ui32 idx = 0; idx < VDisks.size(); ++idx) {
664679
if (!SlowFlags[idx]) {
665680
std::unique_ptr<TEvBlobStorage::TEvVPatchStart> ev = std::make_unique<TEvBlobStorage::TEvVPatchStart>(
666-
OriginalId, PatchedId, VDisks[idx], vpatchStartDeadline, idx, true);
681+
OriginalId, PatchedId, VDisks[idx], Deadline, idx, true);
667682
events.emplace_back(std::move(ev));
668683
SentStarts++;
669684
}
@@ -767,19 +782,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
767782
bool ContinueVPatch() {
768783
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA15, "Continue VPatch strategy",
769784
(FoundParts, ConvertFoundPartsToString()));
785+
StageStart = TActivationContext::Now();
770786
IsContinuedVPatch = true;
771787

772-
VPatchDiffDeadline = Deadline;
773-
ui64 worstNs = 0;
774-
ui64 nextToWorstNs = 0;
775-
i32 worstSubGroubIdx = -1;
776-
GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx);
777-
if (worstNs * 2 > nextToWorstNs) {
778-
SlowFlags[worstSubGroubIdx] = true;
779-
}
780-
Schedule(TDuration::MicroSeconds(nextToWorstNs * VPatchDiffWaitingMultiplier / 1000), new TEvents::TEvWakeup(VPatchDiffTag));
781-
VPatchDiffDeadline = Min(Deadline, TActivationContext::Now() + TDuration::MicroSeconds(nextToWorstNs * VPatchDiffWaitingMultiplier * 0.9 / 1000));
782-
783788
if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) {
784789
return ContinueVPatchForMirror3dc();
785790
}
@@ -881,6 +886,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
881886

882887
void Bootstrap() {
883888
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA01, "Actor bootstrapped");
889+
Schedule(TDuration::MicroSeconds(60'000'000), new TEvents::TEvWakeup(NeverTag));
884890

885891
TLogoBlobID truePatchedBlobId = PatchedId;
886892
bool result = true;
@@ -926,14 +932,39 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
926932

927933
template <ui64 ExpectedTag>
928934
void HandleWakeUp(TEvents::TEvWakeup::TPtr &ev) {
935+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA36, "HandleWakeUp",
936+
(ExpectedTag, ToString(ExpectedTag)),
937+
(ReceivedTag, ToString(ev->Get()->Tag)));
929938
if (ev->Get()->Tag == ExpectedTag) {
930939
StartFallback();
931940
}
941+
if (ev->Get()->Tag == NeverTag) {
942+
StartFallback();
943+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA40, "Found NeverTag wake up", (ExpectedTag, ToString(ExpectedTag)));
944+
}
932945
}
933946

934947
void HandleVPatchWakeUp(TEvents::TEvWakeup::TPtr &ev) {
935-
if (ev->Get()->Tag == (IsContinuedVPatch ? VPatchDiffTag : VPatchStartTag)) {
948+
ui64 expectedTag = (IsContinuedVPatch ? VPatchDiffTag : VPatchStartTag);
949+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA37, "HandleWakeUp",
950+
(ExpectedTag, ToString(expectedTag)),
951+
(ReceivedTag, ToString(ev->Get()->Tag)));
952+
if (ev->Get()->Tag == expectedTag) {
953+
StartFallback();
954+
}
955+
if (ev->Get()->Tag == NeverTag) {
936956
StartFallback();
957+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA41, "Found NeverTag wake up", (ExpectedTag, ToString(expectedTag)));
958+
}
959+
}
960+
961+
void HandleNeverTagWakeUp(TEvents::TEvWakeup::TPtr &ev) {
962+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA42, "HandleWakeUp",
963+
(ExpectedTag, ToString(NeverTag)),
964+
(ReceivedTag, ToString(ev->Get()->Tag)));
965+
if (ev->Get()->Tag == NeverTag) {
966+
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA43, "Found NeverTag wake up in naive state");
967+
ReplyAndDie(NKikimrProto::DEADLINE);
937968
}
938969
}
939970

@@ -945,10 +976,13 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
945976
hFunc(TEvBlobStorage::TEvGetResult, Handle);
946977
hFunc(TEvBlobStorage::TEvPutResult, Handle);
947978

948-
hFunc(TEvents::TEvWakeup, HandleWakeUp<NeverTag>);
979+
IgnoreFunc(TEvents::TEvWakeup);
980+
//hFunc(TEvents::TEvWakeup, HandleWakeUp<NeverTag>);
949981
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
982+
IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
983+
IgnoreFunc(TEvBlobStorage::TEvVMovedPatchResult);
950984
default:
951-
Y_ABORT("Received unknown event");
985+
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
952986
};
953987
}
954988

@@ -960,8 +994,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
960994
hFunc(TEvBlobStorage::TEvVMovedPatchResult, Handle);
961995
hFunc(TEvents::TEvWakeup, HandleWakeUp<MovedPatchTag>);
962996
IgnoreFunc(TEvBlobStorage::TEvVPatchResult);
997+
IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts);
963998
default:
964-
Y_ABORT("Received unknown event");
999+
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
9651000
};
9661001
}
9671002

@@ -974,7 +1009,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
9741009
hFunc(TEvBlobStorage::TEvVPatchResult, Handle);
9751010
hFunc(TEvents::TEvWakeup, HandleVPatchWakeUp);
9761011
default:
977-
Y_ABORT("Received unknown event");
1012+
Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase()));
9781013
};
9791014
}
9801015
};

ydb/core/blobstorage/dsproxy/group_sessions.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ static NKikimrBlobStorage::EVDiskQueueId VDiskQueues[] = {
1212
NKikimrBlobStorage::EVDiskQueueId::GetFastRead,
1313
NKikimrBlobStorage::EVDiskQueueId::GetDiscover,
1414
NKikimrBlobStorage::EVDiskQueueId::GetLowRead,
15+
NKikimrBlobStorage::EVDiskQueueId::PatchSubRequest,
1516
};
1617

1718
TString QueueIdName(NKikimrBlobStorage::EVDiskQueueId queueId) {
1819
switch (queueId) {
19-
case NKikimrBlobStorage::EVDiskQueueId::PutTabletLog: return "PutTabletLog";
20-
case NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob: return "PutAsyncBlob";
21-
case NKikimrBlobStorage::EVDiskQueueId::PutUserData: return "PutUserData";
22-
case NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead: return "GetAsyncRead";
23-
case NKikimrBlobStorage::EVDiskQueueId::GetFastRead: return "GetFastRead";
24-
case NKikimrBlobStorage::EVDiskQueueId::GetDiscover: return "GetDiscover";
25-
case NKikimrBlobStorage::EVDiskQueueId::GetLowRead: return "GetLowRead";
20+
case NKikimrBlobStorage::EVDiskQueueId::PutTabletLog: return "PutTabletLog";
21+
case NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob: return "PutAsyncBlob";
22+
case NKikimrBlobStorage::EVDiskQueueId::PutUserData: return "PutUserData";
23+
case NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead: return "GetAsyncRead";
24+
case NKikimrBlobStorage::EVDiskQueueId::GetFastRead: return "GetFastRead";
25+
case NKikimrBlobStorage::EVDiskQueueId::GetDiscover: return "GetDiscover";
26+
case NKikimrBlobStorage::EVDiskQueueId::GetLowRead: return "GetLowRead";
27+
case NKikimrBlobStorage::EVDiskQueueId::PatchSubRequest: return "PatchSubRequest";
2628
default: Y_ABORT("unexpected EVDiskQueueId");
2729
}
2830
}
@@ -59,6 +61,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
5961
case NKikimrBlobStorage::PutAsyncBlob:
6062
case NKikimrBlobStorage::GetAsyncRead:
6163
case NKikimrBlobStorage::GetLowRead:
64+
case NKikimrBlobStorage::PatchSubRequest:
6265
interconnectChannel = TInterconnectChannels::IC_BLOBSTORAGE_ASYNC_DATA;
6366
break;
6467

0 commit comments

Comments
 (0)