Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions ydb/core/mind/node_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,36 @@ void TNodeBroker::PrepareEpochCache()
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}

void TNodeBroker::PrepareUpdateNodesLog()
{
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
"Preparing update nodes log for epoch #" << Committed.Epoch.ToString()
<< " nodes=" << Committed.Nodes.size()
<< " expired=" << Committed.ExpiredNodes.size()
<< " removed=" << Committed.RemovedNodes.size());

UpdateNodesLog.clear();
UpdateNodesLogVersions.clear();

TVector<TVersionedNodeID> nodeIdsSortedByVersion;
for (auto &entry : Committed.Nodes) {
nodeIdsSortedByVersion.emplace_back(entry.second.NodeId, entry.second.Version);
}
for (auto &entry : Committed.ExpiredNodes) {
nodeIdsSortedByVersion.emplace_back(entry.second.NodeId, entry.second.Version);
}
for (auto &entry : Committed.RemovedNodes) {
nodeIdsSortedByVersion.emplace_back(entry.second.NodeId, entry.second.Version);
}
std::sort(nodeIdsSortedByVersion.begin(), nodeIdsSortedByVersion.end(), TVersionedNodeID::TCmpByVersion());

for (const auto &id : nodeIdsSortedByVersion) {
const auto& node = *Committed.FindNode(id.NodeId);
AddNodeToUpdateNodesLog(node);
}
TabletCounters->Simple()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set(UpdateNodesLog.size());
}

void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)
{
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
Expand Down Expand Up @@ -615,13 +645,132 @@ void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version)
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}

void TNodeBroker::AddNodeToUpdateNodesLog(const TNodeInfo &node)
{
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
"Add node " << node.IdShortString() << " to update nodes log");

NKikimrNodeBroker::TUpdateNodes updateNodes;

switch (node.State) {
case ENodeState::Active:
FillNodeInfo(node, *updateNodes.AddUpdates()->MutableNode());
break;
case ENodeState::Expired:
updateNodes.AddUpdates()->SetExpiredNode(node.NodeId);
break;
case ENodeState::Removed:
updateNodes.AddUpdates()->SetRemovedNode(node.NodeId);
break;
}

TString delta;
Y_PROTOBUF_SUPPRESS_NODISCARD updateNodes.SerializeToString(&delta);

Y_ENSURE(UpdateNodesLogVersions.empty() || UpdateNodesLogVersions.back().Version <= node.Version);
if (!UpdateNodesLogVersions.empty() && UpdateNodesLogVersions.back().Version == node.Version) {
UpdateNodesLog += delta;
UpdateNodesLogVersions.back().CacheEndOffset = UpdateNodesLog.size();
} else {
UpdateNodesLog += delta;
UpdateNodesLogVersions.emplace_back(node.Version, UpdateNodesLog.size());
}
TabletCounters->Simple()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set(UpdateNodesLog.size());
}

void TNodeBroker::SubscribeForConfigUpdates(const TActorContext &ctx)
{
ui32 nodeBrokerItem = (ui32)NKikimrConsole::TConfigItem::NodeBrokerConfigItem;
ui32 featureFlagsItem = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
NConsole::SubscribeViaConfigDispatcher(ctx, {nodeBrokerItem, featureFlagsItem}, ctx.SelfID);
}

void TNodeBroker::SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, const TActorContext &ctx) const
{
SendToSubscriber(subscriber, event, 0, ctx);
}

void TNodeBroker::SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, ui64 cookie, const TActorContext &ctx) const
{
THolder<IEventHandle> ev = MakeHolder<IEventHandle>(subscriber.Id, ctx.SelfID, event, 0, cookie);
if (subscriber.PipeServerInfo->IcSession) {
ev->Rewrite(TEvInterconnect::EvForward, subscriber.PipeServerInfo->IcSession);
}
ctx.Send(ev.Release());
}

void TNodeBroker::SendUpdateNodes(const TActorContext &ctx)
{
if (SentVersion >= Committed.Epoch.Version) {
return;
}

for (const auto& [_, subscriber] : Subscribers) {
SendUpdateNodes(subscriber, SentVersion, ctx);
}
SentVersion = Committed.Epoch.Version;
}

void TNodeBroker::SendUpdateNodes(const TSubscriberInfo &subscriber, ui64 version, const TActorContext &ctx)
{
NKikimrNodeBroker::TUpdateNodes record;
record.SetSeqNo(subscriber.SeqNo);
Committed.Epoch.Serialize(*record.MutableEpoch());
auto response = MakeHolder<TEvNodeBroker::TEvUpdateNodes>(record);

auto it = std::lower_bound(UpdateNodesLogVersions.begin(), UpdateNodesLogVersions.end(), version + 1);
if (it != UpdateNodesLogVersions.begin()) {
response->PreSerializedData = UpdateNodesLog.substr(std::prev(it)->CacheEndOffset);
} else {
response->PreSerializedData = UpdateNodesLog;
}

TabletCounters->Percentile()[COUNTER_UPDATE_NODES_BYTES].IncrementFor(response->GetCachedByteSize());
LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER,
"Send TEvUpdateNodes v" << version << " -> v" << Committed.Epoch.Version
<< " to " << subscriber.Id);
SendToSubscriber(subscriber, response.Release(), ctx);
}

TNodeBroker::TSubscriberInfo& TNodeBroker::AddSubscriber(TActorId subscriberId,
TActorId pipeServerId,
ui64 seqNo,
const TActorContext &ctx)
{
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
"New subscriber " << subscriberId
<< ", seqNo: " << seqNo
<< ", server pipe id: " << pipeServerId);

auto& pipeServer = PipeServers.at(pipeServerId);
auto res = Subscribers.emplace(subscriberId, TSubscriberInfo(subscriberId, seqNo, &pipeServer));
Y_ENSURE(res.second, "Subscription already exists for " << subscriberId);
pipeServer.Subscribers.insert(subscriberId);
return res.first->second;
}

void TNodeBroker::RemoveSubscriber(TActorId subscriber, const TActorContext &ctx)
{
auto it = Subscribers.find(subscriber);
Y_ENSURE(it != Subscribers.end(), "No subscription for " << subscriber);

LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
"Unsubscribed " << subscriber
<< ", seqNo: " << it->second.SeqNo
<< ", server pipe id: " << it->second.PipeServerInfo->Id);

it->second.PipeServerInfo->Subscribers.erase(subscriber);
Subscribers.erase(it);
}

bool TNodeBroker::HasOutdatedSubscription(TActorId subscriber, ui64 newSeqNo) const
{
if (auto it = Subscribers.find(subscriber); it != Subscribers.end()) {
return it->second.SeqNo < newSeqNo;
}
return false;
}

void TNodeBroker::TState::LoadConfigFromProto(const NKikimrNodeBroker::TConfig &config)
{
Config = config;
Expand Down Expand Up @@ -1452,6 +1601,53 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev,
Execute(CreateTxUpdateConfig(ev), ctx);
}

void TNodeBroker::Handle(TEvNodeBroker::TEvSubscribeNodesRequest::TPtr &ev,
const TActorContext &ctx)
{
TabletCounters->Cumulative()[COUNTER_SUBSCRIBE_NODES_REQUESTS].Increment(1);

auto seqNo = ev->Get()->Record.GetSeqNo();
if (HasOutdatedSubscription(ev->Sender, seqNo)) {
RemoveSubscriber(ev->Sender, ctx);
}

if (!Subscribers.contains(ev->Sender)) {
const auto& subscriber = AddSubscriber(ev->Sender, ev->Recipient, seqNo, ctx);
SendUpdateNodes(subscriber, ev->Get()->Record.GetCachedVersion(), ctx);
}
}

void TNodeBroker::Handle(TEvNodeBroker::TEvSyncNodesRequest::TPtr &ev,
const TActorContext &ctx)
{
TabletCounters->Cumulative()[COUNTER_SYNC_NODES_REQUESTS].Increment(1);

if (auto it = Subscribers.find(ev->Sender); it != Subscribers.end()) {
if (it->second.SeqNo == ev->Get()->Record.GetSeqNo()) {
auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
response->Record.SetSeqNo(it->second.SeqNo);
SendToSubscriber(it->second, response.Release(), ev->Cookie, ctx);
}
}
}

void TNodeBroker::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev)
{
auto res = PipeServers.emplace(ev->Get()->ServerId, TPipeServerInfo(ev->Get()->ServerId, ev->Get()->InterconnectSession));
Y_ENSURE(res.second, "Unexpected TEvServerConnected for " << ev->Get()->ServerId);
}

void TNodeBroker::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev,
const TActorContext &ctx)
{
auto it = PipeServers.find(ev->Get()->ServerId);
Y_ENSURE(it != PipeServers.end(), "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId);
while (!it->second.Subscribers.empty()) {
RemoveSubscriber(*it->second.Subscribers.begin(), ctx);
}
PipeServers.erase(it);
}

void TNodeBroker::Handle(TEvPrivate::TEvUpdateEpoch::TPtr &ev,
const TActorContext &ctx)
{
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/mind/node_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ struct TEvNodeBroker {
EvGracefulShutdownRequest,
EvGracefulShutdownResponse,

// delta protocol
EvSubscribeNodesRequest,
EvUpdateNodes,
EvSyncNodesRequest,
EvSyncNodesResponse,

// TODO: remove
// internal
//EvNodeExpire = EvListNodes + 512,
Expand Down Expand Up @@ -204,6 +210,31 @@ struct TEvNodeBroker {
NKikimrNodeBroker::TSetConfigResponse,
EvSetConfigResponse> {
};

struct TEvSubscribeNodesRequest : public TEventPB<TEvSubscribeNodesRequest,
NKikimrNodeBroker::TSubscribeNodesRequest,
EvSubscribeNodesRequest> {
};

struct TEvUpdateNodes : public TEventPreSerializedPB<TEvUpdateNodes,
NKikimrNodeBroker::TUpdateNodes,
EvUpdateNodes> {
TEvUpdateNodes() = default;
TEvUpdateNodes(const NKikimrNodeBroker::TUpdateNodes &record)
: TEventPreSerializedPB(record)
{
}
};

struct TEvSyncNodesRequest : public TEventPB<TEvSyncNodesRequest,
NKikimrNodeBroker::TSyncNodesRequest,
EvSyncNodesRequest> {
};

struct TEvSyncNodesResponse : public TEventPB<TEvSyncNodesResponse,
NKikimrNodeBroker::TSyncNodesResponse,
EvSyncNodesResponse> {
};
};

constexpr ui32 DOMAIN_BITS = TDomainsInfo::DomainBits;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/node_broker__extend_lease.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class TNodeBroker::TTxExtendLease : public TTransactionBase<TNodeBroker> {
Self->Committed.ExtendLease(node);
Self->Committed.UpdateEpochVersion();
Self->AddNodeToEpochCache(node);
Self->AddNodeToUpdateNodesLog(node);
}
Self->SendUpdateNodes(ctx);
}

private:
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/mind/node_broker__migrate_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ class TNodeBroker::TTxMigrateState : public TTransactionBase<TNodeBroker> {

if (Finalized) {
Self->Committed = Self->Dirty;
Self->SentVersion = Self->Committed.Epoch.Version;
Self->Become(&TNodeBroker::StateWork);
Self->SubscribeForConfigUpdates(ctx);
Self->ScheduleEpochUpdate(ctx);
Self->PrepareEpochCache();
Self->SignalTabletActive(ctx);
Self->PrepareUpdateNodesLog();

NKikimrNodeBroker::TVersionInfo versionInfo;
versionInfo.SetSupportDeltaProtocol(true);
Self->SignalTabletActive(ctx, versionInfo.SerializeAsString());
} else {
Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/node_broker__register_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
if (ShouldUpdateVersion()) {
Self->Committed.UpdateEpochVersion();
Self->AddNodeToEpochCache(node);
Self->AddNodeToUpdateNodesLog(node);
}

Reply(ctx);
Self->SendUpdateNodes(ctx);
}

private:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/node_broker__update_epoch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class TNodeBroker::TTxUpdateEpoch : public TTransactionBase<TNodeBroker> {
Self->Committed.ApplyStateDiff(Diff);
Self->ScheduleEpochUpdate(ctx);
Self->PrepareEpochCache();
Self->PrepareUpdateNodesLog();
Self->ProcessDelayedListNodesRequests();
Self->SendUpdateNodes(ctx);
}

private:
Expand Down
Loading
Loading