Skip to content

Commit 1f802f9

Browse files
authored
Merge 6115328 into fc062ee
2 parents fc062ee + 6115328 commit 1f802f9

File tree

12 files changed

+276
-138
lines changed

12 files changed

+276
-138
lines changed

ydb/core/blobstorage/base/blobstorage_events.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
namespace NKikimr {
55

66
TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
7-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
8-
TBridgeInfo::TPtr bridgeInfo)
7+
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo)
98
: Config(std::move(config))
10-
, ProposedConfig(std::move(proposedConfig))
119
, SelfManagementEnabled(selfManagementEnabled)
1210
, BridgeInfo(std::move(bridgeInfo))
1311
{}

ydb/core/blobstorage/base/blobstorage_events.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,13 +592,11 @@ namespace NKikimr {
592592
: TEventLocal<TEvNodeWardenStorageConfig, TEvBlobStorage::EvNodeWardenStorageConfig>
593593
{
594594
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> Config;
595-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> ProposedConfig;
596595
bool SelfManagementEnabled;
597596
TBridgeInfo::TPtr BridgeInfo;
598597

599598
TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
600-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
601-
TBridgeInfo::TPtr bridgeInfo);
599+
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo);
602600
~TEvNodeWardenStorageConfig();
603601
};
604602

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ namespace NKikimr::NStorage {
130130
ProposedStorageConfig.reset();
131131
}
132132

133-
ReportStorageConfigToNodeWarden(0);
133+
ReportStorageConfigToNodeWarden();
134134

135135
if (IsSelfStatic) {
136136
PersistConfig({});
@@ -177,15 +177,7 @@ namespace NKikimr::NStorage {
177177
}
178178

179179
void TDistributedConfigKeeper::HandleConfigConfirm(STATEFN_SIG) {
180-
if (ev->Cookie) {
181-
STLOG(PRI_DEBUG, BS_NODE, NWDC46, "HandleConfigConfirm", (Cookie, ev->Cookie),
182-
(ProposedStorageConfigCookie, ProposedStorageConfigCookie),
183-
(ProposedStorageConfigCookieUsage, ProposedStorageConfigCookieUsage));
184-
if (ev->Cookie == ProposedStorageConfigCookie && ProposedStorageConfigCookieUsage) {
185-
--ProposedStorageConfigCookieUsage;
186-
}
187-
FinishAsyncOperation(ev->Cookie);
188-
}
180+
Y_UNUSED(ev);
189181
}
190182

191183
void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
@@ -233,13 +225,22 @@ namespace NKikimr::NStorage {
233225

234226
for (const auto& [cookie, task] : ScatterTasks) {
235227
for (const ui32 nodeId : task.PendingNodes) {
236-
const auto it = DirectBoundNodes.find(nodeId);
237-
Y_ABORT_UNLESS(it != DirectBoundNodes.end());
238-
TBoundNode& info = it->second;
239-
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
228+
if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
229+
TBoundNode& info = it->second;
230+
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
231+
} else {
232+
Y_ABORT_UNLESS(AddedNodesScatterTasks.contains({nodeId, cookie}));
233+
}
240234
}
241235
}
242236

237+
for (const auto& [nodeId, cookie] : AddedNodesScatterTasks) {
238+
const auto it = ScatterTasks.find(cookie);
239+
Y_ABORT_UNLESS(it != ScatterTasks.end());
240+
TScatterTask& task = it->second;
241+
Y_ABORT_UNLESS(task.PendingNodes.contains(nodeId));
242+
}
243+
243244
for (const auto& [nodeId, info] : DirectBoundNodes) {
244245
for (const ui64 cookie : info.ScatterTasks) {
245246
const auto it = ScatterTasks.find(cookie);
@@ -250,10 +251,12 @@ namespace NKikimr::NStorage {
250251
}
251252

252253
for (const auto& [cookie, task] : ScatterTasks) {
253-
if (task.Origin) {
254-
Y_ABORT_UNLESS(Binding);
255-
Y_ABORT_UNLESS(task.Origin == Binding);
256-
}
254+
std::visit(TOverloaded{
255+
[&](const TBinding& origin) { Y_ABORT_UNLESS(origin == Binding); },
256+
[&](const TActorId& /*actorId*/) { Y_ABORT_UNLESS(!Binding); },
257+
[&](const TScatterTaskOriginFsm&) {},
258+
[&](const TScatterTaskOriginTargeted&) {}
259+
}, task.Origin);
257260
}
258261

259262
for (const auto& [nodeId, subs] : SubscribedSessions) {
@@ -277,6 +280,10 @@ namespace NKikimr::NStorage {
277280
if (UnsubscribeQueue.contains(nodeId)) {
278281
okay = true;
279282
}
283+
if (!okay) {
284+
const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0});
285+
okay = it != AddedNodesScatterTasks.end() && std::get<0>(*it) == nodeId;
286+
}
280287
Y_ABORT_UNLESS(okay);
281288
if (subs.SubscriptionCookie) {
282289
const auto it = SubscriptionCookieMap.find(subs.SubscriptionCookie);
@@ -377,16 +384,12 @@ namespace NKikimr::NStorage {
377384
}
378385
}
379386

380-
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden(ui64 cookie) {
387+
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden() {
381388
Y_ABORT_UNLESS(StorageConfig);
382389
const TActorId wardenId = MakeBlobStorageNodeWardenID(SelfId().NodeId());
383390
const auto& config = SelfManagementEnabled ? StorageConfig : BaseConfig;
384-
auto proposedConfig = ProposedStorageConfig && SelfManagementEnabled
385-
? std::make_shared<NKikimrBlobStorage::TStorageConfig>(*ProposedStorageConfig)
386-
: nullptr;
387-
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, std::move(proposedConfig), SelfManagementEnabled,
388-
BridgeInfo);
389-
Send(wardenId, ev.release(), 0, cookie);
391+
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, SelfManagementEnabled, BridgeInfo);
392+
Send(wardenId, ev.release());
390393
}
391394

392395
STFUNC(TDistributedConfigKeeper::StateFunc) {

ydb/core/blobstorage/nodewarden/distconf.h

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,22 +172,39 @@ namespace NKikimr::NStorage {
172172
ui64 Id = RandomNumber<ui64>(); // unique id
173173
};
174174

175+
struct TScatterTaskOriginFsm {
176+
TString ToString() const { return "fsm"; }
177+
};
178+
179+
struct TScatterTaskOriginTargeted {
180+
TActorId Sender;
181+
ui64 Cookie;
182+
TActorId InterconnectSessionId;
183+
184+
TString ToString() const { return TStringBuilder() << "{Sender# " << Sender << " Cookie# " << Cookie
185+
<< " InterconnectSessionId# " << InterconnectSessionId << '}'; }
186+
};
187+
188+
using TScatterTaskOrigin = std::variant<
189+
TBinding, // when scatter is received from root
190+
TActorId, // locally generated by invoke processor
191+
TScatterTaskOriginFsm, // locally generated by configuration change FSM
192+
TScatterTaskOriginTargeted // when targeted scatter is issued by cluster leader to newly added nodes
193+
>;
194+
175195
struct TScatterTask {
176-
const std::optional<TBinding> Origin;
196+
const TScatterTaskOrigin Origin;
177197
const ui64 ScepterCounter;
178-
const TActorId ActorId;
179198

180199
THashSet<ui32> PendingNodes;
181200
ui32 AsyncOperationsPending = 0;
182201
TEvScatter Request;
183202
TEvGather Response;
184203
std::vector<TEvGather> CollectedResponses; // from bound nodes
185204

186-
TScatterTask(const std::optional<TBinding>& origin, TEvScatter&& request,
187-
ui64 scepterCounter, TActorId actorId)
188-
: Origin(origin)
205+
TScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, ui64 scepterCounter)
206+
: Origin(std::move(origin))
189207
, ScepterCounter(scepterCounter)
190-
, ActorId(actorId)
191208
{
192209
Request.Swap(&request);
193210
if (Request.HasCookie()) {
@@ -218,8 +235,7 @@ namespace NKikimr::NStorage {
218235

219236
// proposed storage configuration of the cluster
220237
std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig; // proposed one
221-
ui64 ProposedStorageConfigCookie; // if set, then this configuration is being written right now
222-
ui32 ProposedStorageConfigCookieUsage = 0;
238+
std::optional<ui64> ProposedStorageConfigCookie; // if set, then this configuration is being written right now
223239

224240
// most relevant proposed config
225241
using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>;
@@ -266,6 +282,7 @@ namespace NKikimr::NStorage {
266282
ui64 NextScatterCookie = RandomNumber<ui64>();
267283
using TScatterTasks = THashMap<ui64, TScatterTask>;
268284
TScatterTasks ScatterTasks;
285+
std::set<std::tuple<ui32, ui64>> AddedNodesScatterTasks;
269286

270287
std::optional<TActorId> StateStorageSelfHealActor;
271288

@@ -284,6 +301,7 @@ namespace NKikimr::NStorage {
284301
NKikimrBlobStorage::TStorageConfig StorageConfig; // storage config being proposed
285302
TActorId ActorId; // actor id waiting for this operation to complete
286303
bool MindPrev; // mind previous configuration quorum
304+
std::vector<TNodeIdentifier> AddedNodes; // a list of nodes being added in this configuration change
287305
};
288306
std::optional<TProposition> CurrentProposition;
289307

@@ -348,7 +366,7 @@ namespace NKikimr::NStorage {
348366
void Halt(); // cease any distconf activity, unbind and reject any bindings
349367
bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config, bool fromBinding = false);
350368
void HandleConfigConfirm(STATEFN_SIG);
351-
void ReportStorageConfigToNodeWarden(ui64 cookie);
369+
void ReportStorageConfigToNodeWarden();
352370

353371
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
354372
// PDisk configuration retrieval and storing
@@ -411,6 +429,7 @@ namespace NKikimr::NStorage {
411429
void BecomeRoot();
412430
void UnbecomeRoot();
413431
void HandleErrorTimeout();
432+
void UndoCurrentPropositionNodeChange(TProposition& proposition);
414433
void ProcessGather(TEvGather *res);
415434
bool HasConnectedNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, bool local) const;
416435

@@ -487,7 +506,7 @@ namespace NKikimr::NStorage {
487506
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
488507
// Scatter/gather logic
489508

490-
void IssueScatterTask(std::optional<TActorId> actorId, TEvScatter&& request);
509+
void IssueScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, std::span<TNodeIdentifier> addedNodes = {});
491510
void CheckCompleteScatterTask(TScatterTasks::iterator it);
492511
void FinishAsyncOperation(ui64 cookie);
493512
void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task);

ydb/core/blobstorage/nodewarden/distconf_binding.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ namespace NKikimr::NStorage {
4949
break;
5050
}
5151
}
52-
Y_ABORT_UNLESS(found);
52+
auto fn = [](const auto& x) { return TStringBuilder() << std::get<0>(x); };
53+
Y_VERIFY_S(found, "SelfNodeId# " << selfNodeId << " NewNodeList# " << FormatList(newNodeList | std::views::transform(fn)));
5354

5455
// process all other nodes, find bindable ones (from our current pile) and build list of all nodes
5556
AllNodeIds.clear();
@@ -297,6 +298,13 @@ namespace NKikimr::NStorage {
297298
if (Binding && Binding->NodeId == nodeId) {
298299
AbortBinding("disconnection", false);
299300
}
301+
302+
// abort scatter tasks issued to newly added nodes
303+
for (auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
304+
std::get<0>(*it) == nodeId; it = AddedNodesScatterTasks.erase(it)) {
305+
const auto& [nodeId, cookie] = *it;
306+
AbortScatterTask(cookie, nodeId);
307+
}
300308
}
301309

302310
void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) {
@@ -309,6 +317,10 @@ namespace NKikimr::NStorage {
309317
if (ConnectedDynamicNodes.contains(nodeId)) {
310318
return;
311319
}
320+
if (const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
321+
std::get<0>(*it) == nodeId) {
322+
return;
323+
}
312324
if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end()) {
313325
TSessionSubscription& subs = it->second;
314326
STLOG(PRI_DEBUG, BS_NODE, NWDC55, "UnsubscribeInterconnect", (NodeId, nodeId), (Subscription, subs));
@@ -319,11 +331,17 @@ namespace NKikimr::NStorage {
319331
Y_ABORT_UNLESS(jt != SubscriptionCookieMap.end());
320332
Y_ABORT_UNLESS(jt->second == nodeId);
321333
SubscriptionCookieMap.erase(jt);
334+
if (!AllNodeIds.contains(nodeId)) {
335+
TActivationContext::Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0,
336+
TActivationContext::InterconnectProxy(nodeId), {}, nullptr, 0));
337+
}
322338
} else {
323339
// we already had TEvNodeConnected, so we have to unsubscribe
324340
Y_ABORT_UNLESS(subs.SessionId);
325-
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, subs.SessionId, SelfId(),
326-
nullptr, 0));
341+
ui32 event = AllNodeIds.contains(nodeId)
342+
? TEvents::TSystem::Unsubscribe
343+
: TEvents::TSystem::Poison;
344+
TActivationContext::Send(new IEventHandle(event, 0, subs.SessionId, SelfId(), nullptr, 0));
327345
}
328346
SubscribedSessions.erase(it);
329347
}
@@ -403,7 +421,8 @@ namespace NKikimr::NStorage {
403421
return; // possible race with unbinding
404422
}
405423

406-
Y_ABORT_UNLESS(Binding->RootNodeId || ScatterTasks.empty());
424+
auto isTargeted = [](const TScatterTaskOrigin& origin) { return std::holds_alternative<TScatterTaskOriginTargeted>(origin); };
425+
Y_ABORT_UNLESS(Binding->RootNodeId || std::ranges::all_of(ScatterTasks | std::views::values, isTargeted, &TScatterTask::Origin));
407426

408427
// check if this binding was accepted and if it is acceptable from our point of view
409428
bool bindingUpdate = false;
@@ -568,7 +587,7 @@ namespace NKikimr::NStorage {
568587

569588
if (!AllNodeIds.contains(senderNodeId)) {
570589
// node has been already deleted from the config, but new subscription is coming through -- ignoring it
571-
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer));
590+
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr));
572591
return;
573592
}
574593

0 commit comments

Comments
 (0)