|
| 1 | +#pragma once |
| 2 | + |
| 3 | +#include "distconf.h" |
| 4 | + |
| 5 | +namespace NKikimr::NStorage { |
| 6 | + |
| 7 | + class TDistributedConfigKeeper::TInvokeRequestHandlerActor : public TActorBootstrapped<TInvokeRequestHandlerActor> { |
| 8 | + TDistributedConfigKeeper* const Self; |
| 9 | + const std::weak_ptr<TLifetimeToken> LifetimeToken; |
| 10 | + const std::weak_ptr<TScepter> Scepter; |
| 11 | + std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>> Event; |
| 12 | + const TActorId Sender; |
| 13 | + const ui64 Cookie; |
| 14 | + const TActorId RequestSessionId; |
| 15 | + |
| 16 | + TActorId ParentId; |
| 17 | + ui32 WaitingReplyFromNode = 0; |
| 18 | + |
| 19 | + using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot; |
| 20 | + using TResult = NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult; |
| 21 | + |
| 22 | + using TGatherCallback = std::function<std::optional<TString>(TEvGather*)>; |
| 23 | + ui64 NextScatterCookie = 1; |
| 24 | + THashMap<ui64, TGatherCallback> ScatterTasks; |
| 25 | + |
| 26 | + std::shared_ptr<TLifetimeToken> RequestHandlerToken = std::make_shared<TLifetimeToken>(); |
| 27 | + |
| 28 | + public: |
| 29 | + TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev); |
| 30 | + |
| 31 | + void Bootstrap(TActorId parentId); |
| 32 | + |
| 33 | + void Handle(TEvNodeConfigInvokeOnRootResult::TPtr ev); |
| 34 | + |
| 35 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 36 | + // Interconnect machinery |
| 37 | + |
| 38 | + THashMap<ui32, TActorId> Subscriptions; |
| 39 | + |
| 40 | + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev); |
| 41 | + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev); |
| 42 | + void UnsubscribeInterconnect(); |
| 43 | + |
| 44 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 45 | + // Query execution logic |
| 46 | + |
| 47 | + void ExecuteQuery(); |
| 48 | + void IssueScatterTask(TEvScatter&& task, TGatherCallback callback); |
| 49 | + void Handle(TEvNodeConfigGather::TPtr ev); |
| 50 | + |
| 51 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 52 | + // Configuration update |
| 53 | + |
| 54 | + void UpdateConfig(TQuery::TUpdateConfig *request); |
| 55 | + |
| 56 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 57 | + // Reassign group disk logic |
| 58 | + |
| 59 | + THashMultiMap<ui32, TVDiskID> NodeToVDisk; |
| 60 | + THashMap<TActorId, TVDiskID> ActorToVDisk; |
| 61 | + std::optional<NKikimrBlobStorage::TBaseConfig> BaseConfig; |
| 62 | + THashSet<TVDiskID> PendingVDiskIds; |
| 63 | + TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo; |
| 64 | + std::optional<TBlobStorageGroupInfo::TGroupVDisks> SuccessfulVDisks; |
| 65 | + |
| 66 | + void ReassignGroupDisk(const TQuery::TReassignGroupDisk& cmd); |
| 67 | + void IssueVStatusQueries(const NKikimrBlobStorage::TGroupInfo& group); |
| 68 | + void Handle(TEvBlobStorage::TEvVStatusResult::TPtr ev); |
| 69 | + void Handle(TEvents::TEvUndelivered::TPtr ev); |
| 70 | + void OnVStatusError(TVDiskID vdiskId); |
| 71 | + void Handle(TEvNodeWardenBaseConfig::TPtr ev); |
| 72 | + void CheckReassignGroupDisk(); |
| 73 | + void ReassignGroupDiskExecute(); |
| 74 | + |
| 75 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 76 | + // VDiskSlain/DropDonor logic |
| 77 | + |
| 78 | + void StaticVDiskSlain(const TQuery::TStaticVDiskSlain& cmd); |
| 79 | + void DropDonor(const TQuery::TDropDonor& cmd); |
| 80 | + void HandleDropDonorAndSlain(TVDiskID vdiskId, const NKikimrBlobStorage::TVSlotId& vslotId, bool isDropDonor); |
| 81 | + |
| 82 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 83 | + // State Storage operation |
| 84 | + |
| 85 | + void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd); |
| 86 | + |
| 87 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 88 | + // Storage configuration YAML manipulation |
| 89 | + |
| 90 | + NKikimrBlobStorage::TStorageConfig ProposedStorageConfig; |
| 91 | + |
| 92 | + std::optional<TString> NewYaml; |
| 93 | + std::optional<TString> NewStorageYaml; |
| 94 | + std::optional<ui64> MainYamlVersion; |
| 95 | + std::optional<ui64> StorageYamlVersion; |
| 96 | + |
| 97 | + TActorId ControllerPipeId; |
| 98 | + |
| 99 | + enum class EControllerOp { |
| 100 | + UNSET, |
| 101 | + ENABLE_DISTCONF, |
| 102 | + DISABLE_DISTCONF, |
| 103 | + OTHER, |
| 104 | + } ControllerOp = EControllerOp::UNSET; |
| 105 | + |
| 106 | + void FetchStorageConfig(bool manual, bool fetchMain, bool fetchStorage); |
| 107 | + void ReplaceStorageConfig(const TQuery::TReplaceStorageConfig& request); |
| 108 | + void ReplaceStorageConfigResume(const std::optional<TString>& storageConfigYaml, ui64 expectedMainYamlVersion, |
| 109 | + ui64 expectedStorageYamlVersion, bool enablingDistconf); |
| 110 | + void TryEnableDistconf(); |
| 111 | + void ConnectToController(); |
| 112 | + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); |
| 113 | + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); |
| 114 | + void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev); |
| 115 | + void Handle(TEvBlobStorage::TEvControllerDistconfResponse::TPtr ev); |
| 116 | + void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev); |
| 117 | + void BootstrapCluster(const TString& selfAssemblyUUID); |
| 118 | + |
| 119 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 120 | + // Configuration proposition |
| 121 | + |
| 122 | + void AdvanceGeneration(); |
| 123 | + void StartProposition(NKikimrBlobStorage::TStorageConfig *config, bool updateFields = true); |
| 124 | + |
| 125 | + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| 126 | + // Query termination and result delivery |
| 127 | + |
| 128 | + bool RunCommonChecks(); |
| 129 | + |
| 130 | + std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status, std::optional<TStringBuf> errorReason); |
| 131 | + void FinishWithError(TResult::EStatus status, const TString& errorReason); |
| 132 | + |
| 133 | + template<typename... TArgs> |
| 134 | + void Finish(TArgs&&... args) { |
| 135 | + auto handle = std::make_unique<IEventHandle>(std::forward<TArgs>(args)...); |
| 136 | + if (RequestSessionId) { // deliver response through interconnection session the request arrived from |
| 137 | + handle->Rewrite(TEvInterconnect::EvForward, RequestSessionId); |
| 138 | + } |
| 139 | + TActivationContext::Send(handle.release()); |
| 140 | + PassAway(); |
| 141 | + } |
| 142 | + |
| 143 | + void PassAway() override; |
| 144 | + |
| 145 | + STFUNC(StateFunc); |
| 146 | + }; |
| 147 | + |
| 148 | +} // NKikimr::NStorage |
0 commit comments