|
| 1 | +#include "dsproxy.h" |
| 2 | +#include "dsproxy_mon.h" |
| 3 | +#include "dsproxy_quorum_tracker.h" |
| 4 | +#include "dsproxy_blob_tracker.h" |
| 5 | +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> |
| 6 | + |
| 7 | +namespace NKikimr { |
| 8 | + |
| 9 | +class TBlobStorageGroupCheckIntegrityGetRequest : public TBlobStorageGroupRequestActor { |
| 10 | + const ui32 QuerySize; |
| 11 | + TArrayHolder<TEvBlobStorage::TEvGet::TQuery> Queries; |
| 12 | + const TInstant Deadline; |
| 13 | + NKikimrBlobStorage::EGetHandleClass GetHandleClass; |
| 14 | + |
| 15 | + TGroupQuorumTracker QuorumTracker; |
| 16 | + std::unique_ptr<TBlobStatusTracker> BlobStatusTracker; |
| 17 | + ui32 VGetsInFlight = 0; |
| 18 | + |
| 19 | + std::unique_ptr<TEvBlobStorage::TEvGetResult> PendingResult; |
| 20 | + |
| 21 | + void ReplyAndDie(NKikimrProto::EReplyStatus status) override { |
| 22 | + Mon->CountCheckIntegrityGetResponseTime(TActivationContext::Monotonic() - RequestStartTime); |
| 23 | + |
| 24 | + if (status != NKikimrProto::OK) { |
| 25 | + PendingResult.reset(new TEvBlobStorage::TEvGetResult(status, 1, Info->GroupID)); |
| 26 | + PendingResult->ErrorReason = ErrorReason; |
| 27 | + |
| 28 | + auto& response = PendingResult->Responses[0]; |
| 29 | + response.Status = NKikimrProto::UNKNOWN; |
| 30 | + response.Id = Queries[0].Id; |
| 31 | + response.Shift = Queries[0].Shift; |
| 32 | + response.RequestedSize = Queries[0].Size; |
| 33 | + } |
| 34 | + |
| 35 | + SendResponseAndDie(std::move(PendingResult)); |
| 36 | + } |
| 37 | + |
| 38 | + TString DumpBlobStatus() const { |
| 39 | + TStringStream str; |
| 40 | + BlobStatusTracker->Output(str, Info.Get()); |
| 41 | + return str.Str(); |
| 42 | + } |
| 43 | + |
| 44 | + void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) { |
| 45 | + ProcessReplyFromQueue(ev->Get()); |
| 46 | + |
| 47 | + const NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record; |
| 48 | + |
| 49 | + Y_ABORT_UNLESS(record.HasStatus()); |
| 50 | + NKikimrProto::EReplyStatus status = record.GetStatus(); |
| 51 | + |
| 52 | + Y_ABORT_UNLESS(record.HasVDiskID()); |
| 53 | + const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); |
| 54 | + |
| 55 | + Y_ABORT_UNLESS(VGetsInFlight > 0); |
| 56 | + --VGetsInFlight; |
| 57 | + |
| 58 | + switch (NKikimrProto::EReplyStatus newStatus = QuorumTracker.ProcessReply(vDiskId, status)) { |
| 59 | + case NKikimrProto::ERROR: |
| 60 | + ErrorReason = "Group is disintegrated"; |
| 61 | + ReplyAndDie(NKikimrProto::ERROR); |
| 62 | + return; |
| 63 | + |
| 64 | + case NKikimrProto::OK: |
| 65 | + case NKikimrProto::UNKNOWN: |
| 66 | + break; |
| 67 | + |
| 68 | + default: |
| 69 | + Y_ABORT("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data()); |
| 70 | + } |
| 71 | + |
| 72 | + for (size_t i = 0; i < record.ResultSize(); ++i) { |
| 73 | + const auto& result = record.GetResult(i); |
| 74 | + BlobStatusTracker->UpdateFromResponseData(result, vDiskId, Info.Get()); |
| 75 | + } |
| 76 | + |
| 77 | + if (!VGetsInFlight) { |
| 78 | + Analyze(); |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + void Analyze() { |
| 83 | + PendingResult.reset(new TEvBlobStorage::TEvGetResult(NKikimrProto::OK, 1, Info->GroupID)); |
| 84 | + |
| 85 | + auto& response = PendingResult->Responses[0]; |
| 86 | + response.Id = Queries[0].Id; |
| 87 | + response.Shift = Queries[0].Shift; |
| 88 | + response.RequestedSize = Queries[0].Size; |
| 89 | + |
| 90 | + TBlobStorageGroupInfo::EBlobState blobState = BlobStatusTracker->GetBlobState(Info.Get(), nullptr); |
| 91 | + |
| 92 | + switch (blobState) { |
| 93 | + case TBlobStorageGroupInfo::EBS_DISINTEGRATED: |
| 94 | + ErrorReason = "Group is disintegrated"; |
| 95 | + ReplyAndDie(NKikimrProto::ERROR); |
| 96 | + return; |
| 97 | + |
| 98 | + case TBlobStorageGroupInfo::EBS_FULL: |
| 99 | + response.Status = NKikimrProto::OK; |
| 100 | + break; |
| 101 | + |
| 102 | + case TBlobStorageGroupInfo::EBS_UNRECOVERABLE_FRAGMENTARY: |
| 103 | + case TBlobStorageGroupInfo::EBS_RECOVERABLE_FRAGMENTARY: |
| 104 | + response.Status = NKikimrProto::ERROR; |
| 105 | + break; |
| 106 | + |
| 107 | + case TBlobStorageGroupInfo::EBS_RECOVERABLE_DOUBTED: |
| 108 | + response.Status = NKikimrProto::UNKNOWN; |
| 109 | + break; |
| 110 | + } |
| 111 | + |
| 112 | + ReplyAndDie(NKikimrProto::OK); |
| 113 | + } |
| 114 | + |
| 115 | + std::unique_ptr<IEventBase> RestartQuery(ui32 counter) override { |
| 116 | + ++*Mon->NodeMon->RestartCheckIntegrityGet; |
| 117 | + |
| 118 | + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(Queries, 1, Deadline, GetHandleClass, false, false); |
| 119 | + ev->RestartCounter = counter; |
| 120 | + ev->CheckIntegrity = true; |
| 121 | + return ev; |
| 122 | + } |
| 123 | + |
| 124 | +public: |
| 125 | + ::NMonitoring::TDynamicCounters::TCounterPtr& GetActiveCounter() const override { |
| 126 | + return Mon->ActiveCheckIntegrityGet; |
| 127 | + } |
| 128 | + |
| 129 | + ERequestType GetRequestType() const override { |
| 130 | + return ERequestType::Get; |
| 131 | + } |
| 132 | + |
| 133 | + TBlobStorageGroupCheckIntegrityGetRequest(TBlobStorageGroupCheckIntegrityGetParameters& params) |
| 134 | + : TBlobStorageGroupRequestActor(params) |
| 135 | + , QuerySize(params.Common.Event->QuerySize) |
| 136 | + , Queries(params.Common.Event->Queries.Release()) |
| 137 | + , Deadline(params.Common.Event->Deadline) |
| 138 | + , GetHandleClass(params.Common.Event->GetHandleClass) |
| 139 | + , QuorumTracker(Info.Get()) |
| 140 | + {} |
| 141 | + |
| 142 | + void Bootstrap() override { |
| 143 | + if (QuerySize != 1) { |
| 144 | + ErrorReason = "CheckIntegrityGet can only be executed with one query"; |
| 145 | + ReplyAndDie(NKikimrProto::ERROR); |
| 146 | + return; |
| 147 | + } |
| 148 | + |
| 149 | + const TLogoBlobID& id = Queries[0].Id; |
| 150 | + if (id.FullID() != id) { |
| 151 | + ErrorReason = "CheckIntegrityGet can only be executed with full blob id"; |
| 152 | + ReplyAndDie(NKikimrProto::ERROR); |
| 153 | + return; |
| 154 | + } |
| 155 | + |
| 156 | + BlobStatusTracker.reset(new TBlobStatusTracker(id, Info.Get())); |
| 157 | + |
| 158 | + for (const auto& vdisk : Info->GetVDisks()) { |
| 159 | + auto vDiskId = Info->GetVDiskId(vdisk.OrderNumber); |
| 160 | + |
| 161 | + if (!Info->BelongsToSubgroup(vDiskId, id.Hash())) { |
| 162 | + continue; |
| 163 | + } |
| 164 | + |
| 165 | + auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery( |
| 166 | + vDiskId, |
| 167 | + Deadline, |
| 168 | + NKikimrBlobStorage::EGetHandleClass::FastRead, |
| 169 | + TEvBlobStorage::TEvVGet::EFlags::ShowInternals); |
| 170 | + |
| 171 | + vGet->AddExtremeQuery(id, 0, 0); |
| 172 | + |
| 173 | + SendToQueue(std::move(vGet), 0); |
| 174 | + ++VGetsInFlight; |
| 175 | + } |
| 176 | + |
| 177 | + Become(&TBlobStorageGroupCheckIntegrityGetRequest::StateWait); |
| 178 | + } |
| 179 | + |
| 180 | + STATEFN(StateWait) { |
| 181 | + if (ProcessEvent(ev)) { |
| 182 | + return; |
| 183 | + } |
| 184 | + switch (ev->GetTypeRewrite()) { |
| 185 | + hFunc(TEvBlobStorage::TEvVGetResult, Handle); |
| 186 | + } |
| 187 | + } |
| 188 | +}; |
| 189 | + |
| 190 | +IActor* CreateBlobStorageGroupCheckIntegrityGetRequest(TBlobStorageGroupCheckIntegrityGetParameters params) { |
| 191 | + return new TBlobStorageGroupCheckIntegrityGetRequest(params); |
| 192 | +} |
| 193 | + |
| 194 | +} //NKikimr |
0 commit comments