@@ -13,9 +13,7 @@ namespace NKikimr::NStorage {
1313 const TActorId RequestSessionId;
1414
1515 TActorId ParentId;
16-
17- TActorId InterconnectSessionId;
18- ui32 ConnectedPeerNodeId = 0 ;
16+ ui32 WaitingReplyFromNode = 0 ;
1917
2018 using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot;
2119 using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult;
@@ -43,53 +41,56 @@ namespace NKikimr::NStorage {
4341 Become (&TThis::StateFunc);
4442
4543 if (auto scepter = Scepter.lock ()) {
46- // remove unnecessary subscription, if any
47- UnsubscribeInterconnect ();
4844 ExecuteQuery ();
49- } else if (Self->Binding ) {
50- if (RequestSessionId) {
51- FinishWithError (TResult::ERROR, " no double-hop invokes allowed" );
52- } else if (Self->Binding ->RootNodeId != ConnectedPeerNodeId) { // subscribe to session first
53- Send (TActivationContext::InterconnectProxy (Self->Binding ->RootNodeId ), new TEvInterconnect::TEvConnectNode);
54- UnsubscribeInterconnect ();
55- } else { // session is already established, forward event to peer node
56- Y_ABORT_UNLESS (Event);
57- auto ev = IEventHandle::Forward (std::exchange (Event, {}), MakeBlobStorageNodeWardenID (ConnectedPeerNodeId));
58- ev->Rewrite (TEvInterconnect::EvForward, InterconnectSessionId);
59- TActivationContext::Send (ev.release ());
60- }
61- } else {
45+ } else if (!Self->Binding ) {
6246 FinishWithError (TResult::NO_QUORUM, " no quorum obtained" );
47+ } else if (RequestSessionId) {
48+ FinishWithError (TResult::ERROR, " no double-hop invokes allowed" );
49+ } else {
50+ const ui32 root = Self->Binding ->RootNodeId ;
51+ Send (MakeBlobStorageNodeWardenID (root), Event->Release (), IEventHandle::FlagSubscribeOnSession);
52+ const auto [it, inserted] = Subscriptions.try_emplace (root);
53+ Y_ABORT_UNLESS (inserted);
54+ WaitingReplyFromNode = root;
55+ }
56+ }
57+
58+ void Handle (TEvNodeConfigInvokeOnRootResult::TPtr ev) {
59+ if (ev->HasEvent ()) {
60+ Finish (Sender, SelfId (), ev->ReleaseBase ().Release (), ev->Flags , Cookie);
61+ } else {
62+ Finish (ev->Type , ev->Flags , Sender, SelfId (), ev->ReleaseChainBuffer (), Cookie);
6363 }
6464 }
6565
6666 // //////////////////////////////////////////////////////////////////////////////////////////////////////////////
6767 // Interconnect machinery
6868
69+ THashMap<ui32, TActorId> Subscriptions;
70+
6971 void Handle (TEvInterconnect::TEvNodeConnected::TPtr ev) {
70- // remember actor id of interconnect session to unsubcribe later
71- InterconnectSessionId = ev->Sender ;
72- ConnectedPeerNodeId = ev->Get ()->NodeId ;
73- // restart query from the beginning
74- Bootstrap (ParentId);
72+ const ui32 nodeId = ev->Get ()->NodeId ;
73+ if (const auto it = Subscriptions.find (nodeId); it != Subscriptions.end ()) {
74+ it->second = ev->Sender ;
75+ }
7576 }
7677
77- void Handle (TEvInterconnect::TEvNodeDisconnected::TPtr /* ev*/ ) {
78- FinishWithError (TResult::ERROR, " root node disconnected" );
78+ void Handle (TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
79+ const ui32 nodeId = ev->Get ()->NodeId ;
80+ Subscriptions.erase (nodeId);
81+ if (nodeId == WaitingReplyFromNode) {
82+ FinishWithError (TResult::ERROR, " root node disconnected" );
83+ }
84+ for (auto [begin, end] = NodeToVDisk.equal_range (nodeId); begin != end; ++begin) {
85+ OnVStatusError (begin->second );
86+ }
7987 }
8088
8189 void UnsubscribeInterconnect () {
82- if (const TActorId actorId = std::exchange (InterconnectSessionId, {})) {
90+ for (auto it = Subscriptions.begin (); it != Subscriptions.end (); ) {
91+ const TActorId actorId = it->second ? it->second : TActivationContext::InterconnectProxy (it->first );
8392 TActivationContext::Send (new IEventHandle (TEvents::TSystem::Unsubscribe, 0 , actorId, SelfId (), nullptr , 0 ));
84- ConnectedPeerNodeId = 0 ;
85- }
86- }
87-
88- void Handle (TEvNodeConfigInvokeOnRootResult::TPtr ev) {
89- if (ev->HasEvent ()) {
90- Finish (Sender, SelfId (), ev->ReleaseBase ().Release (), ev->Flags , Cookie);
91- } else {
92- Finish (ev->Type , ev->Flags , Sender, SelfId (), ev->ReleaseChainBuffer (), Cookie);
93+ Subscriptions.erase (it++);
9394 }
9495 }
9596
@@ -169,22 +170,163 @@ namespace NKikimr::NStorage {
169170 // //////////////////////////////////////////////////////////////////////////////////////////////////////////////
170171 // Reassign group disk logic
171172
172- void ReassignGroupDisk (const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TReassignGroupDisk& /* cmd*/ ) {
173+ void ReassignGroupDisk (const TQuery::TReassignGroupDisk& cmd) {
174+ if (!RunCommonChecks ()) {
175+ return ;
176+ }
177+
178+ bool found = false ;
179+ const ui32 groupId = cmd.GetVDiskId ().GetGroupID ();
180+ for (const auto & group : Self->StorageConfig ->GetBlobStorageConfig ().GetServiceSet ().GetGroups ()) {
181+ if (group.GetGroupID () == groupId) {
182+ found = true ;
183+ if (!cmd.GetIgnoreGroupFailModelChecks ()) {
184+ IssueVStatusQueries (group);
185+ }
186+ break ;
187+ }
188+ }
189+ if (!found) {
190+ return FinishWithError (TResult::ERROR, TStringBuilder () << " GroupId# " << groupId << " not found" );
191+ }
192+
173193 Send (MakeBlobStorageNodeWardenID (SelfId ().NodeId ()), new TEvNodeWardenQueryBaseConfig);
174194 }
175195
196+ THashMultiMap<ui32, TVDiskID> NodeToVDisk;
197+ THashMap<TActorId, TVDiskID> ActorToVDisk;
198+ std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig;
199+ THashSet<TVDiskID> PendingVDiskIds;
200+ TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
201+ std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks;
202+
203+ void IssueVStatusQueries (const NKikimrBlobStorage::TGroupInfo& group) {
204+ TStringStream err;
205+ GroupInfo = TBlobStorageGroupInfo::Parse (group, nullptr , &err);
206+ if (!GroupInfo) {
207+ return FinishWithError (TResult::ERROR, TStringBuilder () << " failed to parse group info: " << err.Str ());
208+ }
209+ SuccessfulVDisks.emplace (&GroupInfo->GetTopology ());
210+
211+ for (ui32 i = 0 , num = GroupInfo->GetTotalVDisksNum (); i < num; ++i) {
212+ const TVDiskID vdiskId = GroupInfo->GetVDiskId (i);
213+ const TActorId actorId = GroupInfo->GetActorId (i);
214+ const ui32 flags = IEventHandle::FlagTrackDelivery |
215+ (actorId.NodeId () == SelfId ().NodeId () ? 0 : IEventHandle::FlagSubscribeOnSession);
216+ STLOG (PRI_DEBUG, BS_NODE, NW53, " sending TEvVStatus" , (SelfId, SelfId ()), (VDiskId, vdiskId),
217+ (ActorId, actorId));
218+ Send (actorId, new TEvBlobStorage::TEvVStatus (vdiskId), flags);
219+ if (actorId.NodeId () != SelfId ().NodeId ()) {
220+ NodeToVDisk.emplace (actorId.NodeId (), vdiskId);
221+ }
222+ ActorToVDisk.emplace (actorId, vdiskId);
223+ PendingVDiskIds.emplace (vdiskId);
224+ }
225+ }
226+
227+ void Handle (TEvBlobStorage::TEvVStatusResult::TPtr ev) {
228+ const auto & record = ev->Get ()->Record ;
229+ const TVDiskID vdiskId = VDiskIDFromVDiskID (record.GetVDiskID ());
230+ STLOG (PRI_DEBUG, BS_NODE, NW54, " TEvVStatusResult" , (SelfId, SelfId ()), (Record, record), (VDiskId, vdiskId));
231+ if (!PendingVDiskIds.erase (vdiskId)) {
232+ return FinishWithError (TResult::ERROR, TStringBuilder () << " TEvVStatusResult VDiskID# " << vdiskId
233+ << " is unexpected" );
234+ }
235+ if (record.GetJoinedGroup () && record.GetReplicated ()) {
236+ *SuccessfulVDisks |= {&GroupInfo->GetTopology (), vdiskId};
237+ }
238+ CheckReassignGroupDisk ();
239+ }
240+
241+ void Handle (TEvents::TEvUndelivered::TPtr ev) {
242+ if (const auto it = ActorToVDisk.find (ev->Sender ); it != ActorToVDisk.end ()) {
243+ Y_ABORT_UNLESS (ev->Get ()->SourceType == TEvBlobStorage::EvVStatus);
244+ OnVStatusError (it->second );
245+ }
246+ }
247+
248+ void OnVStatusError (TVDiskID vdiskId) {
249+ PendingVDiskIds.erase (vdiskId);
250+ CheckReassignGroupDisk ();
251+ }
252+
176253 void Handle (TEvNodeWardenBaseConfig::TPtr ev) {
254+ BaseConfig.emplace (std::move (ev->Get ()->BaseConfig ));
255+ CheckReassignGroupDisk ();
256+ }
257+
258+ void CheckReassignGroupDisk () {
259+ if (BaseConfig && PendingVDiskIds.empty ()) {
260+ ReassignGroupDiskExecute ();
261+ }
262+ }
263+
264+ void ReassignGroupDiskExecute () {
177265 const auto & record = Event->Get ()->Record ;
178266 const auto & cmd = record.GetReassignGroupDisk ();
179267
180- if (!RunCommonChecks ()) {
268+ if (Scepter.expired ()) {
269+ return FinishWithError (TResult::ERROR, " scepter lost during query execution" );
270+ } else if (!RunCommonChecks ()) {
181271 return ;
182272 }
183273
184- NKikimrBlobStorage::TStorageConfig config = *Self-> StorageConfig ;
274+ STLOG (PRI_DEBUG, BS_NODE, NW55, " ReassignGroupDiskExecute " , (SelfId, SelfId ())) ;
185275
186276 const auto & vdiskId = VDiskIDFromVDiskID (cmd.GetVDiskId ());
187277
278+ ui64 maxSlotSize = 0 ;
279+
280+ if (SuccessfulVDisks) {
281+ const auto & checker = GroupInfo->GetQuorumChecker ();
282+
283+ auto check = [&](auto failedVDisks, const char *base) {
284+ bool wasDegraded = checker.IsDegraded (failedVDisks) && checker.CheckFailModelForGroup (failedVDisks);
285+ failedVDisks |= {&GroupInfo->GetTopology (), vdiskId};
286+
287+ if (!checker.CheckFailModelForGroup (failedVDisks)) {
288+ FinishWithError (TResult::ERROR, TStringBuilder ()
289+ << " ReassignGroupDisk would render group inoperable (" << base << ' )' );
290+ } else if (!cmd.GetIgnoreDegradedGroupsChecks () && !wasDegraded && checker.IsDegraded (failedVDisks)) {
291+ FinishWithError (TResult::ERROR, TStringBuilder ()
292+ << " ReassignGroupDisk would drive group into degraded state (" << base << ' )' );
293+ } else {
294+ return true ;
295+ }
296+
297+ return false ;
298+ };
299+
300+ if (!check (~SuccessfulVDisks.value (), " polling" )) {
301+ return ;
302+ }
303+
304+ // scan failed disks according to BS_CONTROLLER's data
305+ TBlobStorageGroupInfo::TGroupVDisks failedVDisks (&GroupInfo->GetTopology ());
306+ for (const auto & vslot : BaseConfig->GetVSlot ()) {
307+ if (vslot.GetGroupId () != vdiskId.GroupID || vslot.GetGroupGeneration () != vdiskId.GroupGeneration ) {
308+ continue ;
309+ }
310+ if (!vslot.GetReady ()) {
311+ const TVDiskID vdiskId (vslot.GetGroupId (), vslot.GetGroupGeneration (), vslot.GetFailRealmIdx (),
312+ vslot.GetFailDomainIdx (), vslot.GetVDiskIdx ());
313+ failedVDisks |= {&GroupInfo->GetTopology (), vdiskId};
314+ }
315+ if (vslot.HasVDiskMetrics ()) {
316+ const auto & m = vslot.GetVDiskMetrics ();
317+ if (m.HasAllocatedSize ()) {
318+ maxSlotSize = Max (maxSlotSize, m.GetAllocatedSize ());
319+ }
320+ }
321+ }
322+
323+ if (!check (failedVDisks, " BS_CONTROLLER state" )) {
324+ return ;
325+ }
326+ }
327+
328+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig ;
329+
188330 if (!config.HasBlobStorageConfig ()) {
189331 return FinishWithError (TResult::ERROR, " no BlobStorageConfig defined" );
190332 }
@@ -227,11 +369,12 @@ namespace NKikimr::NStorage {
227369 try {
228370 Self->AllocateStaticGroup (&config, vdiskId.GroupID , vdiskId.GroupGeneration + 1 ,
229371 TBlobStorageGroupType ((TBlobStorageGroupType::EErasureSpecies)group.GetErasureSpecies ()),
230- settings.GetGeometry (), settings.GetPDiskFilter (), replacedDisks, forbid, 0 ,
231- &ev-> Get ()-> BaseConfig , cmd.GetConvertToDonor ());
372+ settings.GetGeometry (), settings.GetPDiskFilter (), replacedDisks, forbid, maxSlotSize ,
373+ &BaseConfig. value () , cmd.GetConvertToDonor ());
232374 } catch (const TExConfigError& ex) {
233- STLOG (PRI_NOTICE, BS_NODE, NW49, " ReassignGroupDisk failed to allocate group" , (Config, config),
234- (BaseConfig, ev->Get ()->BaseConfig ),
375+ STLOG (PRI_NOTICE, BS_NODE, NW49, " ReassignGroupDisk failed to allocate group" , (SelfId, SelfId ()),
376+ (Config, config),
377+ (BaseConfig, *BaseConfig),
235378 (Error, ex.what ()));
236379 return FinishWithError (TResult::ERROR, TStringBuilder () << " failed to allocate group: " << ex.what ());
237380 }
@@ -244,11 +387,14 @@ namespace NKikimr::NStorage {
244387 return FinishWithError (TResult::ERROR, TStringBuilder () << " group not found" );
245388 }
246389
247- void StaticVDiskSlain (const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TStaticVDiskSlain& cmd) {
390+ // //////////////////////////////////////////////////////////////////////////////////////////////////////////////
391+ // VDiskSlain/DropDonor logic
392+
393+ void StaticVDiskSlain (const TQuery::TStaticVDiskSlain& cmd) {
248394 HandleDropDonorAndSlain (VDiskIDFromVDiskID (cmd.GetVDiskId ()), cmd.GetVSlotId (), false );
249395 }
250396
251- void DropDonor (const NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot ::TDropDonor& cmd) {
397+ void DropDonor (const TQuery ::TDropDonor& cmd) {
252398 HandleDropDonorAndSlain (VDiskIDFromVDiskID (cmd.GetVDiskId ()), cmd.GetVSlotId (), true );
253399 }
254400
@@ -365,7 +511,7 @@ namespace NKikimr::NStorage {
365511 UpdateFingerprint (config);
366512
367513 if (auto error = ValidateConfigUpdate (*Self->StorageConfig , *config)) {
368- STLOG (PRI_DEBUG, BS_NODE, NW51, " proposed config validation failed" , (Error, *error),
514+ STLOG (PRI_DEBUG, BS_NODE, NW51, " proposed config validation failed" , (SelfId, SelfId ()), ( Error, *error),
369515 (Config, config));
370516 return FinishWithError (TResult::ERROR, TStringBuilder () << " config validation failed: " << *error);
371517 }
@@ -434,14 +580,21 @@ namespace NKikimr::NStorage {
434580 TActorBootstrapped::PassAway ();
435581 }
436582
437- STRICT_STFUNC (StateFunc,
438- hFunc (TEvNodeConfigInvokeOnRootResult, Handle);
439- hFunc (TEvNodeConfigGather, Handle);
440- hFunc (TEvInterconnect::TEvNodeConnected, Handle);
441- hFunc (TEvInterconnect::TEvNodeDisconnected, Handle);
442- hFunc (TEvNodeWardenBaseConfig, Handle);
443- cFunc (TEvents::TSystem::Poison, PassAway);
444- )
583+ STFUNC (StateFunc) {
584+ if (LifetimeToken.expired ()) {
585+ return FinishWithError (TResult::ERROR, " distributed config keeper terminated" );
586+ }
587+ STRICT_STFUNC_BODY (
588+ hFunc (TEvNodeConfigInvokeOnRootResult, Handle);
589+ hFunc (TEvNodeConfigGather, Handle);
590+ hFunc (TEvInterconnect::TEvNodeConnected, Handle);
591+ hFunc (TEvInterconnect::TEvNodeDisconnected, Handle);
592+ hFunc (TEvBlobStorage::TEvVStatusResult, Handle);
593+ hFunc (TEvents::TEvUndelivered, Handle);
594+ hFunc (TEvNodeWardenBaseConfig, Handle);
595+ cFunc (TEvents::TSystem::Poison, PassAway);
596+ )
597+ }
445598 };
446599
447600 void TDistributedConfigKeeper::Handle (TEvNodeConfigInvokeOnRoot::TPtr ev) {
0 commit comments