Skip to content

Fix scrubbing and flapping unittest #1640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class TBlobStorageQueue {
return Queues.InFlight.size();
}

ui64 GetInFlightCost() const {
return InFlightCost;
}

void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
const TBlobStorageGroupType& type);
void InvalidateCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
<< " msgId# " << msgId << " sequenceId# " << sequenceId
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
<< " InFlightCost# " << Queue.GetInFlightCost()
<< " InFlightCount# " << Queue.InFlightCount()
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
<< " BytesWaiting# " << Queue.GetBytesWaiting());

switch (ws) {
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ namespace NKikimr {
}
}

TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
Y_UNUSED(checkMsgId);
Y_UNUSED(msgId);
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
Y_ABORT_UNLESS(Cost >= cost);
Cost -= cost;
--InFlight;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
msg->SetId(ReaderTabletData->Id);
msg->SetGeneration(ReaderTabletData->Generation);
}
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
<< " vget# " << vget->ToString());
}

for (auto& vget : gets) {
if (vget) {
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
<< " vget# " << vget->ToString());
outVGets.push_back(std::move(vget));
++RequestIndex;
}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ struct TPDiskMockState::TImpl {
}
}

bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize;
return static_cast<bool>(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end});
}

std::set<ui32> GetChunks() {
std::set<ui32> res;
for (auto& [ownerId, owner] : Owners) {
Expand Down Expand Up @@ -293,6 +298,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool
Impl->SetCorruptedArea(chunkIdx, begin, end, enabled);
}

bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
return Impl->HasCorruptedArea(chunkIdx, begin, end);
}

std::set<ui32> TPDiskMockState::GetChunks() {
return Impl->GetChunks();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace NKikimr {
~TPDiskMockState();

void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled);
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end);
std::set<ui32> GetChunks();
TMaybe<NPDisk::TOwnerRound> GetOwnerRound(const TVDiskID& vDiskId) const;
ui32 GetChunkSize() const;
Expand Down
152 changes: 109 additions & 43 deletions ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,130 @@ void Test() {

TString data = TString::Uninitialized(8_MB);
memset(data.Detach(), 'X', data.size());
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}

auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
if (status == NKikimrProto::OK) {
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
for (ui32 step = 1; step < 100; ++step) {
TLogoBlobID id(1, 1, step, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
};

checkReadable(NKikimrProto::OK);
auto checkReadable = [&] {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);

ui32 partsMask = 0;
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TVDiskID& vdiskId = info->GetVDiskId(i);
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
const TActorId sender = runtime->AllocateEdgeActor(1);
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead);
ev->AddExtremeQuery(id, 0, 0);
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
auto& record = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
for (const auto& result : record.GetResult()) {
if (result.GetStatus() == NKikimrProto::OK) {
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
UNIT_ASSERT(id.PartId());
const ui32 partIdx = id.PartId() - 1;
const ui32 mask = 1 << partIdx;
UNIT_ASSERT(!(partsMask & mask));
partsMask |= mask;
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
}
}
});
}
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
};

for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);
checkReadable();

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
ui32 mask = 0;

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
const TDiskPart& part = item.Location;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
break;
ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
mask |= 1 << i;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
break;
}
}

checkReadable();
}

checkReadable(NKikimrProto::OK);
}
env.Sim(TDuration::Seconds(60));

env.Sim(TDuration::Seconds(60));
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
if (~mask >> i & 1) {
continue;
}

const TActorId vdiskActorId = info->GetActorId(i);

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

bool anyPartReadable = false;

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size);
if (anyPartReadable) {
break;
}
}
}

UNIT_ASSERT(anyPartReadable);
}
}
}

Y_UNIT_TEST_SUITE(ScrubFast) {
Y_UNIT_TEST(SingleBlob) {
Test();
}
}

4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ namespace NKikimr {

// a map to fill upon receiving VGet result
struct TPerBlobInfo {
const TInstant Deadline;
std::weak_ptr<TInFlightContext> Context;
TEvRecoverBlobResult::TItem *Item; // item to update
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
};
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;

void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
void SendPendingQueries();
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);
Expand Down
Loading