Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions ydb/core/base/statestorage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,18 +1007,8 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {
};

class TStateStorageProxyStub : public TActor<TStateStorageProxyStub> {
std::deque<std::unique_ptr<IEventHandle>> PendingQ;

void Handle(TEvStateStorage::TEvLookup::TPtr &ev) {
BLOG_D("ProxyStub::Handle ev: " << ev->Get()->ToString());
const TEvStateStorage::TEvLookup *msg = ev->Get();
const ui64 tabletId = msg->TabletID;
const ui64 cookie = msg->Cookie;

Send(ev->Sender, new TEvStateStorage::TEvInfo(
NKikimrProto::ERROR,
tabletId, cookie, TActorId(), TActorId(), 0, 0, false, 0,
nullptr, 0, TMap<TActorId, TActorId>()));
}
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SS_PROXY_STUB;
Expand All @@ -1028,15 +1018,14 @@ class TStateStorageProxyStub : public TActor<TStateStorageProxyStub> {
: TActor(&TThis::StateFunc)
{}

STATEFN(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvLookup, Handle);
default:
BLOG_W("ProxyStub::StateFunc unexpected event type# "
<< ev->GetTypeRewrite()
<< " event: "
<< ev->ToString());
break;
STFUNC(StateFunc) {
if (ev->GetTypeRewrite() == TEvents::TSystem::Poison) {
for (auto& q : PendingQ) {
TActivationContext::Send(q->Forward(ev->Sender));
}
PassAway();
} else {
PendingQ.emplace_back(ev.Release());
}
}
};
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "distconf.h"
#include "node_warden_impl.h"
#include <ydb/core/mind/dynamic_nameserver.h>

namespace NKikimr::NStorage {

Expand All @@ -16,8 +17,13 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Bootstrap() {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

// TODO: maybe extract list of nodes from the initial storage config?
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
// report initial node listing
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
Send(SelfId(), ev.release());

// generate initial drive set and query stored configuration
EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
Expand Down
22 changes: 15 additions & 7 deletions ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,8 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf
const bool changedStateStorage = !StateStorageProxyConfigured || changed(*StateStorageInfo, *stateStorageInfo);
const bool changedBoard = !StateStorageProxyConfigured || changed(*BoardInfo, *boardInfo);
const bool changedSchemeBoard = !StateStorageProxyConfigured || changed(*SchemeBoardInfo, *schemeBoardInfo);
if (changedStateStorage || changedBoard || changedSchemeBoard) { // reconfigure proxy
STLOG(PRI_INFO, BS_NODE, NW50, "updating state storage proxy configuration");
Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(stateStorageInfo, boardInfo,
schemeBoardInfo));
StateStorageProxyConfigured = true;
} else { // no changes
return;
if (!changedStateStorage && !changedBoard && !changedSchemeBoard) {
return; // no changes
}

// start new replicas if needed
Expand Down Expand Up @@ -225,6 +220,19 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf
const TActorId actorId = as->RegisterLocalService(actorId, TActorId());
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0));
}

// reconfigure proxy
STLOG(PRI_INFO, BS_NODE, NW50, "updating state storage proxy configuration");
if (StateStorageProxyConfigured) {
Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(StateStorageInfo, BoardInfo,
SchemeBoardInfo));
} else {
const TActorId newInstance = as->Register(CreateStateStorageProxy(StateStorageInfo, BoardInfo, SchemeBoardInfo),
TMailboxType::ReadAsFilled, AppData()->SystemPoolId);
const TActorId stubInstance = as->RegisterLocalService(MakeStateStorageProxyID(), newInstance);
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, stubInstance, newInstance, nullptr, 0));
StateStorageProxyConfigured = true;
}
}

void TNodeWarden::ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss) {
Expand Down