Skip to content

Commit 6b27219

Browse files
authored
Merge 1c03cba into 5b14371
2 parents 5b14371 + 1c03cba commit 6b27219

File tree

11 files changed

+263
-5
lines changed

11 files changed

+263
-5
lines changed

ydb/core/base/blobstorage.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,7 @@ struct TEvBlobStorage {
11541154
ui32 RestartCounter = 0;
11551155
bool PhantomCheck = false;
11561156
bool Decommission = false; // is it generated by decommission actor and should be handled by the underlying proxy?
1157+
bool CheckIntegrity = false; // check integrity of the blob: placement and data
11571158
std::shared_ptr<TExecutionRelay> ExecutionRelay;
11581159

11591160
struct TTabletData {
@@ -1288,6 +1289,18 @@ struct TEvBlobStorage {
12881289
bool DoNotKeep = false;
12891290
std::optional<bool> LooksLikePhantom; // filled only when PhantomCheck is true
12901291

1292+
// Status values when CheckIntegrity is true:
1293+
// OK - blob parts are placed according to fail model
1294+
// ERROR - blob parts are definitely placed incorrectly or there are missing parts
1295+
// UNKNOWN - status is unknown because of missing disks
1296+
1297+
// TODO:
1298+
NKikimrProto::EReplyStatus DataStatus;
1299+
// DataStatus is filled when CheckIntegrity is true
1300+
// OK - all data parts contain valid data
1301+
// ERROR - some parts definitely contain invalid data
1302+
// UNKNOWN - status is unknown because of missing disks
1303+
12911304
TResponse()
12921305
: Status(NKikimrProto::UNKNOWN)
12931306
, Shift(0)
@@ -1296,6 +1309,9 @@ struct TEvBlobStorage {
12961309
};
12971310

12981311
NKikimrProto::EReplyStatus Status;
1312+
// Status values when CheckIntegrity is true:
1313+
// OK - we were able to check the integrity
1314+
// ERROR - group is disintegrated or any other error preventing to complete the check
12991315

13001316
// todo: replace with queue-like thing
13011317
ui32 ResponseSz;

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,16 @@ struct TBlobStorageGroupRestoreGetParameters {
427427
};
428428
IActor* CreateBlobStorageGroupIndexRestoreGetRequest(TBlobStorageGroupRestoreGetParameters params);
429429

430+
struct TBlobStorageGroupCheckIntegrityGetParameters {
431+
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvGet> Common;
432+
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
433+
.LogComponent = NKikimrServices::BS_PROXY_CHECKINTEGRITYGET,
434+
.Name = "DSProxy.CheckIntegrityGet",
435+
.Activity = NKikimrServices::TActivity::BS_PROXY_CHECKINTEGRITYGET_ACTOR,
436+
};
437+
};
438+
IActor* CreateBlobStorageGroupCheckIntegrityGetRequest(TBlobStorageGroupCheckIntegrityGetParameters params);
439+
430440
struct TBlobStorageGroupDiscoverParameters {
431441
TBlobStorageGroupRequestActor::TCommonParameters<TEvBlobStorage::TEvDiscover> Common;
432442
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
#include "dsproxy.h"
2+
#include "dsproxy_mon.h"
3+
#include "dsproxy_quorum_tracker.h"
4+
#include "dsproxy_blob_tracker.h"
5+
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
6+
7+
namespace NKikimr {
8+
9+
class TBlobStorageGroupCheckIntegrityGetRequest : public TBlobStorageGroupRequestActor {
10+
const ui32 QuerySize;
11+
TArrayHolder<TEvBlobStorage::TEvGet::TQuery> Queries;
12+
const TInstant Deadline;
13+
NKikimrBlobStorage::EGetHandleClass GetHandleClass;
14+
15+
TGroupQuorumTracker QuorumTracker;
16+
std::unique_ptr<TBlobStatusTracker> BlobStatusTracker;
17+
ui32 VGetsInFlight = 0;
18+
19+
std::unique_ptr<TEvBlobStorage::TEvGetResult> PendingResult;
20+
21+
void ReplyAndDie(NKikimrProto::EReplyStatus status) override {
22+
Mon->CountCheckIntegrityGetResponseTime(TActivationContext::Monotonic() - RequestStartTime);
23+
24+
if (status != NKikimrProto::OK) {
25+
PendingResult.reset(new TEvBlobStorage::TEvGetResult(status, 1, Info->GroupID));
26+
PendingResult->ErrorReason = ErrorReason;
27+
28+
auto& response = PendingResult->Responses[0];
29+
response.Status = NKikimrProto::UNKNOWN;
30+
response.Id = Queries[0].Id;
31+
response.Shift = Queries[0].Shift;
32+
response.RequestedSize = Queries[0].Size;
33+
}
34+
35+
SendResponseAndDie(std::move(PendingResult));
36+
}
37+
38+
TString DumpBlobStatus() const {
39+
TStringStream str;
40+
BlobStatusTracker->Output(str, Info.Get());
41+
return str.Str();
42+
}
43+
44+
void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
45+
ProcessReplyFromQueue(ev->Get());
46+
47+
const NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
48+
49+
Y_ABORT_UNLESS(record.HasStatus());
50+
NKikimrProto::EReplyStatus status = record.GetStatus();
51+
52+
Y_ABORT_UNLESS(record.HasVDiskID());
53+
const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
54+
55+
Y_ABORT_UNLESS(VGetsInFlight > 0);
56+
--VGetsInFlight;
57+
58+
switch (NKikimrProto::EReplyStatus newStatus = QuorumTracker.ProcessReply(vDiskId, status)) {
59+
case NKikimrProto::ERROR:
60+
ErrorReason = "Group is disintegrated";
61+
ReplyAndDie(NKikimrProto::ERROR);
62+
return;
63+
64+
case NKikimrProto::OK:
65+
case NKikimrProto::UNKNOWN:
66+
break;
67+
68+
default:
69+
Y_ABORT("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data());
70+
}
71+
72+
for (size_t i = 0; i < record.ResultSize(); ++i) {
73+
const auto& result = record.GetResult(i);
74+
BlobStatusTracker->UpdateFromResponseData(result, vDiskId, Info.Get());
75+
}
76+
77+
if (!VGetsInFlight) {
78+
Analyze();
79+
}
80+
}
81+
82+
void Analyze() {
83+
PendingResult.reset(new TEvBlobStorage::TEvGetResult(NKikimrProto::OK, 1, Info->GroupID));
84+
85+
auto& response = PendingResult->Responses[0];
86+
response.Id = Queries[0].Id;
87+
response.Shift = Queries[0].Shift;
88+
response.RequestedSize = Queries[0].Size;
89+
90+
TBlobStorageGroupInfo::EBlobState blobState = BlobStatusTracker->GetBlobState(Info.Get(), nullptr);
91+
92+
switch (blobState) {
93+
case TBlobStorageGroupInfo::EBS_DISINTEGRATED:
94+
ErrorReason = "Group is disintegrated";
95+
ReplyAndDie(NKikimrProto::ERROR);
96+
return;
97+
98+
case TBlobStorageGroupInfo::EBS_FULL:
99+
response.Status = NKikimrProto::OK;
100+
break;
101+
102+
case TBlobStorageGroupInfo::EBS_UNRECOVERABLE_FRAGMENTARY:
103+
case TBlobStorageGroupInfo::EBS_RECOVERABLE_FRAGMENTARY:
104+
response.Status = NKikimrProto::ERROR;
105+
break;
106+
107+
case TBlobStorageGroupInfo::EBS_RECOVERABLE_DOUBTED:
108+
response.Status = NKikimrProto::UNKNOWN;
109+
break;
110+
}
111+
112+
ReplyAndDie(NKikimrProto::OK);
113+
}
114+
115+
std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override {
116+
++*Mon->NodeMon->RestartCheckIntegrityGet;
117+
118+
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(Queries, 1, Deadline, GetHandleClass, false, false);
119+
ev->RestartCounter = counter;
120+
ev->CheckIntegrity = true;
121+
return ev;
122+
}
123+
124+
public:
125+
::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override {
126+
return Mon->ActiveCheckIntegrityGet;
127+
}
128+
129+
ERequestType GetRequestType() const override {
130+
return ERequestType::Get;
131+
}
132+
133+
TBlobStorageGroupCheckIntegrityGetRequest(TBlobStorageGroupCheckIntegrityGetParameters& params)
134+
: TBlobStorageGroupRequestActor(params)
135+
, QuerySize(params.Common.Event->QuerySize)
136+
, Queries(params.Common.Event->Queries.Release())
137+
, Deadline(params.Common.Event->Deadline)
138+
, GetHandleClass(params.Common.Event->GetHandleClass)
139+
, QuorumTracker(Info.Get())
140+
{}
141+
142+
void Bootstrap() override {
143+
if (QuerySize != 1) {
144+
ErrorReason = "CheckIntegrityGet can only be executed with one query";
145+
ReplyAndDie(NKikimrProto::ERROR);
146+
return;
147+
}
148+
149+
const TLogoBlobID& id = Queries[0].Id;
150+
if (id.FullID() != id) {
151+
ErrorReason = "CheckIntegrityGet can only be executed with full blob id";
152+
ReplyAndDie(NKikimrProto::ERROR);
153+
return;
154+
}
155+
156+
BlobStatusTracker.reset(new TBlobStatusTracker(id, Info.Get()));
157+
158+
for (const auto& vdisk : Info->GetVDisks()) {
159+
auto vDiskId = Info->GetVDiskId(vdisk.OrderNumber);
160+
161+
if (!Info->BelongsToSubgroup(vDiskId, id.Hash())) {
162+
continue;
163+
}
164+
165+
auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(
166+
vDiskId,
167+
Deadline,
168+
NKikimrBlobStorage::EGetHandleClass::FastRead,
169+
TEvBlobStorage::TEvVGet::EFlags::ShowInternals);
170+
171+
vGet->AddExtremeQuery(id, 0, 0);
172+
173+
SendToQueue(std::move(vGet), 0);
174+
++VGetsInFlight;
175+
}
176+
177+
Become(&TBlobStorageGroupCheckIntegrityGetRequest::StateWait);
178+
}
179+
180+
STATEFN(StateWait) {
181+
if (ProcessEvent(ev)) {
182+
return;
183+
}
184+
switch (ev->GetTypeRewrite()) {
185+
hFunc(TEvBlobStorage::TEvVGetResult, Handle);
186+
}
187+
}
188+
};
189+
190+
IActor* CreateBlobStorageGroupCheckIntegrityGetRequest(TBlobStorageGroupCheckIntegrityGetParameters params) {
191+
return new TBlobStorageGroupCheckIntegrityGetRequest(params);
192+
}
193+
194+
} //NKikimr

ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
9898

9999
ActiveMultiGet = ActiveRequestsGroup->GetCounter("ActiveMultiGet");
100100
ActiveIndexRestoreGet = ActiveRequestsGroup->GetCounter("ActiveIndexRestoreGet");
101+
ActiveCheckIntegrityGet = ActiveRequestsGroup->GetCounter("ActiveCheckIntegrityGet");
101102
ActiveMultiCollect = ActiveRequestsGroup->GetCounter("ActiveMultiCollect");
102103

103104
auto respStatGroup = NodeMon->Group->GetSubgroup("subsystem", "responseStatus");
@@ -141,6 +142,8 @@ void TBlobStorageGroupProxyMon::BecomeFull() {
141142
Percentiles1);
142143
RangeResponseTime.Initialize(ResponseGroup, "event", "range", "Response in millisec", Percentiles1);
143144
PatchResponseTime.Initialize(ResponseGroup, "event", "patch", "Response in millisec", Percentiles1);
145+
CheckIntegrityGetResponseTime.Initialize(ResponseGroup, "event", "checkIntegrityGet", "Response in millisec",
146+
Percentiles1);
144147
}
145148
IsLimitedMon = false;
146149
}
@@ -201,6 +204,7 @@ void TBlobStorageGroupProxyMon::Update() {
201204
IndexRestoreGetResponseTime.Update();
202205
RangeResponseTime.Update();
203206
PatchResponseTime.Update();
207+
CheckIntegrityGetResponseTime.Update();
204208
}
205209

206210
BlockResponseTime.Update();
@@ -216,4 +220,3 @@ void TBlobStorageGroupProxyMon::ThroughputUpdate() {
216220

217221

218222
} // NKikimr
219-

ydb/core/blobstorage/dsproxy/dsproxy_mon.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
175175
NMonitoring::TPercentileTrackerLg<3, 4, 3> IndexRestoreGetResponseTime;
176176
NMonitoring::TPercentileTrackerLg<3, 4, 3> RangeResponseTime;
177177
NMonitoring::TPercentileTrackerLg<3, 4, 3> PatchResponseTime;
178+
NMonitoring::TPercentileTrackerLg<3, 4, 3> CheckIntegrityGetResponseTime;
178179

179180
// event counters
180181
TIntrusivePtr<::NMonitoring::TDynamicCounters> EventGroup;
@@ -224,6 +225,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
224225
::NMonitoring::TDynamicCounters::TCounterPtr EventStopGetBatching;
225226
::NMonitoring::TDynamicCounters::TCounterPtr EventPatch;
226227
::NMonitoring::TDynamicCounters::TCounterPtr EventAssimilate;
228+
::NMonitoring::TDynamicCounters::TCounterPtr EventCheckIntegrityGet;
227229

228230
::NMonitoring::TDynamicCounters::TCounterPtr PutsSentViaPutBatching;
229231
::NMonitoring::TDynamicCounters::TCounterPtr PutBatchesSent;
@@ -245,6 +247,7 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
245247
::NMonitoring::TDynamicCounters::TCounterPtr ActiveStatus;
246248
::NMonitoring::TDynamicCounters::TCounterPtr ActivePatch;
247249
::NMonitoring::TDynamicCounters::TCounterPtr ActiveAssimilate;
250+
::NMonitoring::TDynamicCounters::TCounterPtr ActiveCheckIntegrityGet;
248251

249252
std::optional<TResponseStatusGroup> RespStatPut;
250253
std::optional<TResponseStatusGroup> RespStatGet;
@@ -376,9 +379,13 @@ class TBlobStorageGroupProxyMon : public TThrRefBase {
376379
NodeMon->CountPatchResponseTime(type, duration);
377380
}
378381

382+
void CountCheckIntegrityGetResponseTime(TDuration duration) {
383+
CheckIntegrityGetResponseTime.Increment(duration.MilliSeconds());
384+
NodeMon->CheckIntegrityGetResponseTime.Increment(duration.MilliSeconds());
385+
}
386+
379387
void Update();
380388
void ThroughputUpdate();
381389
};
382390

383391
} // NKikimr
384-

ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
3737
percentiles1);
3838
RangeResponseTime.Initialize(Group, "event", "range", "latency", percentiles1);
3939
PatchResponseTime.Initialize(Group, "event", "patch", "latency", percentiles4);
40+
CheckIntegrityGetResponseTime.Initialize(Group, "event", "checkIntegrityGet", "latency",
41+
percentiles1);
4042

4143
IsCountersPresentedForIdx.fill(false);
4244
if (initForAllDeviceTypes) {
@@ -60,6 +62,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
6062
RestartIndexRestoreGet = group->GetCounter("EvIndexRestoreGet", true);
6163
RestartStatus = group->GetCounter("EvStatus", true);
6264
RestartAssimilate = group->GetCounter("EvAssimilate", true);
65+
RestartCheckIntegrityGet = group->GetCounter("EvCheckIntegrityGet", true);
6366
}
6467

6568
{

ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
6262
NMonitoring::TPercentileTracker<16, 512, 15> DiscoverResponseTime;
6363
NMonitoring::TPercentileTracker<16, 512, 15> IndexRestoreGetResponseTime;
6464
NMonitoring::TPercentileTracker<16, 512, 15> RangeResponseTime;
65+
NMonitoring::TPercentileTracker<16, 512, 15> CheckIntegrityGetResponseTime;
6566

6667
std::array<bool, KnownDeviceTypesCount> IsCountersPresentedForIdx;
6768

@@ -77,6 +78,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
7778
::NMonitoring::TDynamicCounters::TCounterPtr RestartStatus;
7879
::NMonitoring::TDynamicCounters::TCounterPtr RestartPatch;
7980
::NMonitoring::TDynamicCounters::TCounterPtr RestartAssimilate;
81+
::NMonitoring::TDynamicCounters::TCounterPtr RestartCheckIntegrityGet;
8082

8183
std::array<::NMonitoring::TDynamicCounters::TCounterPtr, 4> RestartHisto;
8284

@@ -130,4 +132,3 @@ struct TDsProxyNodeMon : public TThrRefBase {
130132
};
131133

132134
} // NKikimr
133-

ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class TDsProxyNodeMonActor : public TActorBootstrapped<TDsProxyNodeMonActor> {
6868
Mon->IndexRestoreGetResponseTime.Update();
6969
Mon->RangeResponseTime.Update();
7070
Mon->PatchResponseTime.Update();
71+
Mon->CheckIntegrityGetResponseTime.Update();
7172
}
7273

7374
void Handle(NMon::TEvHttpInfo::TPtr &ev) {
@@ -107,4 +108,3 @@ IActor* CreateDsProxyNodeMon(TIntrusivePtr<TDsProxyNodeMon> mon) {
107108
}
108109

109110
} // NKikimr
110-

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,28 @@ namespace NKikimr {
4242
EnsureMonitoring(true);
4343
LWTRACK(DSProxyGetHandle, ev->Get()->Orbit);
4444
EnableWilsonTracing(ev, Mon->GetSamplePPM);
45-
if (ev->Get()->IsIndexOnly) {
45+
46+
if (ev->Get()->CheckIntegrity) {
47+
Mon->EventCheckIntegrityGet->Inc();
48+
PushRequest(CreateBlobStorageGroupCheckIntegrityGetRequest(
49+
TBlobStorageGroupCheckIntegrityGetParameters{
50+
.Common = {
51+
.GroupInfo = Info,
52+
.GroupQueues = Sessions->GroupQueues,
53+
.Mon = Mon,
54+
.Source = ev->Sender,
55+
.Cookie = ev->Cookie,
56+
.Now = TActivationContext::Monotonic(),
57+
.StoragePoolCounters = StoragePoolCounters,
58+
.RestartCounter = ev->Get()->RestartCounter,
59+
.TraceId = std::move(ev->TraceId),
60+
.Event = ev->Get(),
61+
.ExecutionRelay = ev->Get()->ExecutionRelay,
62+
}
63+
}),
64+
ev->Get()->Deadline
65+
);
66+
} else if (ev->Get()->IsIndexOnly) {
4667
Mon->EventIndexRestoreGet->Inc();
4768
PushRequest(CreateBlobStorageGroupIndexRestoreGetRequest(
4869
TBlobStorageGroupRestoreGetParameters{

0 commit comments

Comments
 (0)