Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 206 additions & 53 deletions ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ namespace NKikimr::NStorage {
const TActorId RequestSessionId;

TActorId ParentId;

TActorId InterconnectSessionId;
ui32 ConnectedPeerNodeId = 0;
ui32 WaitingReplyFromNode = 0;

using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot;
using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult;
Expand Down Expand Up @@ -43,53 +41,56 @@ namespace NKikimr::NStorage {
Become(&TThis::StateFunc);

if (auto scepter = Scepter.lock()) {
// remove unnecessary subscription, if any
UnsubscribeInterconnect();
ExecuteQuery();
} else if (Self->Binding) {
if (RequestSessionId) {
FinishWithError(TResult::ERROR, "no double-hop invokes allowed");
} else if (Self->Binding->RootNodeId != ConnectedPeerNodeId) { // subscribe to session first
Send(TActivationContext::InterconnectProxy(Self->Binding->RootNodeId), new TEvInterconnect::TEvConnectNode);
UnsubscribeInterconnect();
} else { // session is already established, forward event to peer node
Y_ABORT_UNLESS(Event);
auto ev = IEventHandle::Forward(std::exchange(Event, {}), MakeBlobStorageNodeWardenID(ConnectedPeerNodeId));
ev->Rewrite(TEvInterconnect::EvForward, InterconnectSessionId);
TActivationContext::Send(ev.release());
}
} else {
} else if (!Self->Binding) {
FinishWithError(TResult::NO_QUORUM, "no quorum obtained");
} else if (RequestSessionId) {
FinishWithError(TResult::ERROR, "no double-hop invokes allowed");
} else {
const ui32 root = Self->Binding->RootNodeId;
Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession);
const auto [it, inserted] = Subscriptions.try_emplace(root);
Y_ABORT_UNLESS(inserted);
WaitingReplyFromNode = root;
}
}

void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) {
if (ev->HasEvent()) {
Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie);
} else {
Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie);
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Interconnect machinery

THashMap<ui32, TActorId> Subscriptions;

void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
// remember actor id of interconnect session to unsubcribe later
InterconnectSessionId = ev->Sender;
ConnectedPeerNodeId = ev->Get()->NodeId;
// restart query from the beginning
Bootstrap(ParentId);
const ui32 nodeId = ev->Get()->NodeId;
if (const auto it = Subscriptions.find(nodeId); it != Subscriptions.end()) {
it->second = ev->Sender;
}
}

void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr /*ev*/) {
FinishWithError(TResult::ERROR, "root node disconnected");
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
const ui32 nodeId = ev->Get()->NodeId;
Subscriptions.erase(nodeId);
if (nodeId == WaitingReplyFromNode) {
FinishWithError(TResult::ERROR, "root node disconnected");
}
for (auto [begin, end] = NodeToVDisk.equal_range(nodeId); begin != end; ++begin) {
OnVStatusError(begin->second);
}
}

void UnsubscribeInterconnect() {
if (const TActorId actorId = std::exchange(InterconnectSessionId, {})) {
for (auto it = Subscriptions.begin(); it != Subscriptions.end(); ) {
const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy(it->first);
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, actorId, SelfId(), nullptr, 0));
ConnectedPeerNodeId = 0;
}
}

void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev) {
if (ev->HasEvent()) {
Finish(Sender, SelfId(), ev->ReleaseBase().Release(), ev->Flags, Cookie);
} else {
Finish(ev->Type, ev->Flags, Sender, SelfId(), ev->ReleaseChainBuffer(), Cookie);
Subscriptions.erase(it++);
}
}

Expand Down Expand Up @@ -169,22 +170,163 @@ namespace NKikimr::NStorage {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Reassign group disk logic

void ReassignGroupDisk(const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TReassignGroupDisk& /*cmd*/) {
void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd) {
if (!RunCommonChecks()) {
return;
}

bool found = false;
const ui32 groupId = cmd.GetVDiskId().GetGroupID();
for (const auto& group : Self->StorageConfig->GetBlobStorageConfig().GetServiceSet().GetGroups()) {
if (group.GetGroupID() == groupId) {
found = true;
if (!cmd.GetIgnoreGroupFailModelChecks()) {
IssueVStatusQueries(group);
}
break;
}
}
if (!found) {
return FinishWithError(TResult::ERROR, TStringBuilder() << "GroupId# " << groupId << " not found");
}

Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryBaseConfig);
}

THashMultiMap<ui32, TVDiskID> NodeToVDisk;
THashMap<TActorId, TVDiskID> ActorToVDisk;
std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig;
THashSet<TVDiskID> PendingVDiskIds;
TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks;

void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group) {
TStringStream err;
GroupInfo = TBlobStorageGroupInfo::Parse(group, nullptr, &err);
if (!GroupInfo) {
return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to parse group info: " << err.Str());
}
SuccessfulVDisks.emplace(&GroupInfo->GetTopology());

for (ui32 i = 0, num = GroupInfo->GetTotalVDisksNum(); i < num; ++i) {
const TVDiskID vdiskId = GroupInfo->GetVDiskId(i);
const TActorId actorId = GroupInfo->GetActorId(i);
const ui32 flags = IEventHandle::FlagTrackDelivery |
(actorId.NodeId() == SelfId().NodeId() ? 0 : IEventHandle::FlagSubscribeOnSession);
STLOG(PRI_DEBUG, BS_NODE, NW53, "sending TEvVStatus", (SelfId, SelfId()), (VDiskId, vdiskId),
(ActorId, actorId));
Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), flags);
if (actorId.NodeId() != SelfId().NodeId()) {
NodeToVDisk.emplace(actorId.NodeId(), vdiskId);
}
ActorToVDisk.emplace(actorId, vdiskId);
PendingVDiskIds.emplace(vdiskId);
}
}

void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev) {
const auto& record = ev->Get()->Record;
const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
STLOG(PRI_DEBUG, BS_NODE, NW54, "TEvVStatusResult", (SelfId, SelfId()), (Record, record), (VDiskId, vdiskId));
if (!PendingVDiskIds.erase(vdiskId)) {
return FinishWithError(TResult::ERROR, TStringBuilder() << "TEvVStatusResult VDiskID# " << vdiskId
<< " is unexpected");
}
if (record.GetJoinedGroup() && record.GetReplicated()) {
*SuccessfulVDisks |= {&GroupInfo->GetTopology(), vdiskId};
}
CheckReassignGroupDisk();
}

void Handle(TEvents::TEvUndelivered::TPtr ev) {
if (const auto it = ActorToVDisk.find(ev->Sender); it != ActorToVDisk.end()) {
Y_ABORT_UNLESS(ev->Get()->SourceType == TEvBlobStorage::EvVStatus);
OnVStatusError(it->second);
}
}

void OnVStatusError(TVDiskID vdiskId) {
PendingVDiskIds.erase(vdiskId);
CheckReassignGroupDisk();
}

void Handle(TEvNodeWardenBaseConfig::TPtr ev) {
BaseConfig.emplace(std::move(ev->Get()->BaseConfig));
CheckReassignGroupDisk();
}

void CheckReassignGroupDisk() {
if (BaseConfig && PendingVDiskIds.empty()) {
ReassignGroupDiskExecute();
}
}

void ReassignGroupDiskExecute() {
const auto& record = Event->Get()->Record;
const auto& cmd = record.GetReassignGroupDisk();

if (!RunCommonChecks()) {
if (Scepter.expired()) {
return FinishWithError(TResult::ERROR, "scepter lost during query execution");
} else if (!RunCommonChecks()) {
return;
}

NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
STLOG(PRI_DEBUG, BS_NODE, NW55, "ReassignGroupDiskExecute", (SelfId, SelfId()));

const auto& vdiskId = VDiskIDFromVDiskID(cmd.GetVDiskId());

ui64 maxSlotSize = 0;

if (SuccessfulVDisks) {
const auto& checker = GroupInfo->GetQuorumChecker();

auto check = [&](auto failedVDisks, const char *base) {
bool wasDegraded = checker.IsDegraded(failedVDisks) && checker.CheckFailModelForGroup(failedVDisks);
failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};

if (!checker.CheckFailModelForGroup(failedVDisks)) {
FinishWithError(TResult::ERROR, TStringBuilder()
<< "ReassignGroupDisk would render group inoperable (" << base << ')');
} else if (!cmd.GetIgnoreDegradedGroupsChecks() && !wasDegraded && checker.IsDegraded(failedVDisks)) {
FinishWithError(TResult::ERROR, TStringBuilder()
<< "ReassignGroupDisk would drive group into degraded state (" << base << ')');
} else {
return true;
}

return false;
};

if (!check(~SuccessfulVDisks.value(), "polling")) {
return;
}

// scan failed disks according to BS_CONTROLLER's data
TBlobStorageGroupInfo::TGroupVDisks failedVDisks(&GroupInfo->GetTopology());
for (const auto& vslot : BaseConfig->GetVSlot()) {
if (vslot.GetGroupId() != vdiskId.GroupID || vslot.GetGroupGeneration() != vdiskId.GroupGeneration) {
continue;
}
if (!vslot.GetReady()) {
const TVDiskID vdiskId(vslot.GetGroupId(), vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(),
vslot.GetFailDomainIdx(), vslot.GetVDiskIdx());
failedVDisks |= {&GroupInfo->GetTopology(), vdiskId};
}
if (vslot.HasVDiskMetrics()) {
const auto& m = vslot.GetVDiskMetrics();
if (m.HasAllocatedSize()) {
maxSlotSize = Max(maxSlotSize, m.GetAllocatedSize());
}
}
}

if (!check(failedVDisks, "BS_CONTROLLER state")) {
return;
}
}

NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;

if (!config.HasBlobStorageConfig()) {
return FinishWithError(TResult::ERROR, "no BlobStorageConfig defined");
}
Expand Down Expand Up @@ -227,11 +369,12 @@ namespace NKikimr::NStorage {
try {
Self->AllocateStaticGroup(&config, vdiskId.GroupID, vdiskId.GroupGeneration + 1,
TBlobStorageGroupType((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies()),
settings.GetGeometry(), settings.GetPDiskFilter(), replacedDisks, forbid, 0,
&ev->Get()->BaseConfig, cmd.GetConvertToDonor());
settings.GetGeometry(), settings.GetPDiskFilter(), replacedDisks, forbid, maxSlotSize,
&BaseConfig.value(), cmd.GetConvertToDonor());
} catch (const TExConfigError& ex) {
STLOG(PRI_NOTICE, BS_NODE, NW49, "ReassignGroupDisk failed to allocate group", (Config, config),
(BaseConfig, ev->Get()->BaseConfig),
STLOG(PRI_NOTICE, BS_NODE, NW49, "ReassignGroupDisk failed to allocate group", (SelfId, SelfId()),
(Config, config),
(BaseConfig, *BaseConfig),
(Error, ex.what()));
return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to allocate group: " << ex.what());
}
Expand All @@ -244,11 +387,14 @@ namespace NKikimr::NStorage {
return FinishWithError(TResult::ERROR, TStringBuilder() << "group not found");
}

void StaticVDiskSlain(const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TStaticVDiskSlain& cmd) {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// VDiskSlain/DropDonor logic

void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd) {
HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), false);
}

void DropDonor(const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TDropDonor& cmd) {
void DropDonor(const TQuery::TDropDonor& cmd) {
HandleDropDonorAndSlain(VDiskIDFromVDiskID(cmd.GetVDiskId()), cmd.GetVSlotId(), true);
}

Expand Down Expand Up @@ -365,7 +511,7 @@ namespace NKikimr::NStorage {
UpdateFingerprint(config);

if (auto error = ValidateConfigUpdate(*Self->StorageConfig, *config)) {
STLOG(PRI_DEBUG, BS_NODE, NW51, "proposed config validation failed", (Error, *error),
STLOG(PRI_DEBUG, BS_NODE, NW51, "proposed config validation failed", (SelfId, SelfId()), (Error, *error),
(Config, config));
return FinishWithError(TResult::ERROR, TStringBuilder() << "config validation failed: " << *error);
}
Expand Down Expand Up @@ -434,14 +580,21 @@ namespace NKikimr::NStorage {
TActorBootstrapped::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvNodeConfigInvokeOnRootResult, Handle);
hFunc(TEvNodeConfigGather, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvNodeWardenBaseConfig, Handle);
cFunc(TEvents::TSystem::Poison, PassAway);
)
STFUNC(StateFunc) {
if (LifetimeToken.expired()) {
return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
}
STRICT_STFUNC_BODY(
hFunc(TEvNodeConfigInvokeOnRootResult, Handle);
hFunc(TEvNodeConfigGather, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvBlobStorage::TEvVStatusResult, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvNodeWardenBaseConfig, Handle);
cFunc(TEvents::TSystem::Poison, PassAway);
)
}
};

void TDistributedConfigKeeper::Handle(TEvNodeConfigInvokeOnRoot::TPtr ev) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/mind/bscontroller/cmds_storage_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ namespace NKikimr::NBsController {
});

if (!virtualGroupsOnly) {
const TMonotonic mono = TActivationContext::Monotonic();

// apply static group
for (const auto& [pdiskId, pdisk] : StaticPDisks) {
if (PDisks.Find(pdiskId)) {
Expand Down Expand Up @@ -564,11 +566,11 @@ namespace NKikimr::NBsController {
x->MutableVDiskMetrics()->ClearVDiskId();
}
x->SetStatus(NKikimrBlobStorage::EVDiskStatus_Name(vslot.VDiskStatus));
x->SetReady(vslot.ReadySince <= mono);
}
if (const auto& s = Self.StorageConfig; s.HasBlobStorageConfig()) {
if (const auto& bsConfig = s.GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
const auto& ss = bsConfig.GetServiceSet();
const TMonotonic mono = TActivationContext::Monotonic();
for (const auto& group : ss.GetGroups()) {
auto *x = pb->AddGroup();
x->SetGroupId(group.GetGroupID());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/blobstorage_distributed_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ message TEvNodeConfigInvokeOnRoot {
NKikimrBlobStorage.TVDiskID VDiskId = 1; // which one to reassign
optional NKikimrBlobStorage.TPDiskId PDiskId = 2; // where to put it (optional)
bool ConvertToDonor = 3; // convert the current disk to donor?
bool IgnoreGroupFailModelChecks = 4;
bool IgnoreDegradedGroupsChecks = 5;
}

// Regenerate configuration so the slain VDisk is no more reported as DESTROY one in the list.
Expand Down