Skip to content

Commit 2229e59

Browse files
authored
Merge 6e0cbfa into ce94a36
2 parents ce94a36 + 6e0cbfa commit 2229e59

File tree

5 files changed

+366
-57
lines changed

5 files changed

+366
-57
lines changed

ydb/core/base/tablet_pipe.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,17 @@ namespace NKikimr {
141141
, VersionInfo(std::move(versionInfo))
142142
{}
143143

144+
TEvClientConnected(const TEvClientConnected& other)
145+
: TabletId(other.TabletId)
146+
, Status(other.Status)
147+
, ClientId(other.ClientId)
148+
, ServerId(other.ServerId)
149+
, Leader(other.Leader)
150+
, Dead(other.Dead)
151+
, Generation(other.Generation)
152+
, VersionInfo(other.VersionInfo)
153+
{}
154+
144155
const ui64 TabletId;
145156
const NKikimrProto::EReplyStatus Status;
146157
const TActorId ClientId;
@@ -173,6 +184,12 @@ namespace NKikimr {
173184
, ServerId(serverId)
174185
{}
175186

187+
TEvClientDestroyed(const TEvClientDestroyed& other)
188+
: TabletId(other.TabletId)
189+
, ClientId(other.ClientId)
190+
, ServerId(other.ServerId)
191+
{}
192+
176193
const ui64 TabletId;
177194
const TActorId ClientId;
178195
const TActorId ServerId;

ydb/core/mind/dynamic_nameserver.cpp

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,43 +19,38 @@ static void ResetInterconnectProxyConfig(ui32 nodeId, const TActorContext &ctx)
1919

2020
void TDynamicNodeResolverBase::Bootstrap(const TActorContext &ctx)
2121
{
22-
auto dinfo = AppData(ctx)->DomainsInfo;
23-
24-
NTabletPipe::TClientRetryPolicy retryPolicy = {
25-
.RetryLimitCount = 12,
26-
.MinRetryTime = TDuration::MilliSeconds(50),
27-
.MaxRetryTime = TDuration::Seconds(2)
28-
};
22+
LOG_D("New cache miss: nodeId=" << NodeId << ", deadline=" << Deadline);
2923

30-
auto pipe = NTabletPipe::CreateClient(ctx.SelfID, MakeNodeBrokerID(), NTabletPipe::TClientConfig(retryPolicy));
31-
NodeBrokerPipe = ctx.RegisterWithSameMailbox(pipe);
24+
OpenPipe(ctx);
3225

3326
TAutoPtr<TEvNodeBroker::TEvResolveNode> request = new TEvNodeBroker::TEvResolveNode;
3427
request->Record.SetNodeId(NodeId);
35-
NTabletPipe::SendData(ctx, NodeBrokerPipe, request.Release());
28+
NTabletPipe::SendData(ctx, Config->NodeBrokerPipe, request.Release());
3629

3730
Become(&TDynamicNodeResolverBase::StateWork);
38-
if (Deadline != TInstant::Max()) {
39-
Schedule(Deadline, new TEvents::TEvWakeup);
31+
bool newEarliestDeadline = Config->PendingCacheMisses.empty() || Config->PendingCacheMisses.top().Deadline > Deadline;
32+
if (Deadline != TInstant::Max() && newEarliestDeadline) {
33+
LOG_D("Schedule wakeup for new earliest deadline " << Deadline);
34+
ctx.Schedule(Deadline, std::make_unique<IEventHandle>(Owner, TActorId(), new TEvents::TEvWakeup));
4035
}
36+
Config->PendingCacheMisses.emplace(SelfId(), Deadline);
4137
}
4238

43-
void TDynamicNodeResolverBase::Die(const TActorContext &ctx)
44-
{
45-
if (NodeBrokerPipe)
46-
NTabletPipe::CloseClient(ctx, NodeBrokerPipe);
47-
TBase::Die(ctx);
48-
}
49-
50-
void TDynamicNodeResolverBase::ReplyWithErrorAndDie(const TActorContext &ctx)
39+
void TDynamicNodeResolverBase::ReplyWithErrorAndDie(const TString& error, const TActorContext &ctx)
5140
{
41+
LOG_D("Cache miss failed: nodeId=" << NodeId << ", error=" << error);
5242
OnError(ctx);
5343
Die(ctx);
5444
}
5545

5646
void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev, const TActorContext &ctx)
5747
{
5848
auto &rec = ev->Get()->Record;
49+
50+
LOG_D("Handle TEvNodeBroker::TEvResolvedNode("
51+
<< "nodeId=" << NodeId
52+
<< ", status=" << rec.GetStatus().GetCode() << ")");
53+
5954
TDynamicConfig::TDynamicNodeInfo oldNode;
6055
auto it = Config->DynamicNodes.find(NodeId);
6156
bool exists = it != Config->DynamicNodes.end();
@@ -71,7 +66,7 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
7166
ResetInterconnectProxyConfig(NodeId, ctx);
7267
ListNodesCache->Invalidate(); // node was erased
7368
}
74-
ReplyWithErrorAndDie(ctx);
69+
ReplyWithErrorAndDie("Error from NodeBroker", ctx);
7570
return;
7671
}
7772

@@ -85,14 +80,30 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
8580
ResetInterconnectProxyConfig(NodeId, ctx);
8681
Config->DynamicNodes.emplace(NodeId, node);
8782

83+
LOG_D("Cache miss succeed: nodeId=" << NodeId);
8884
OnSuccess(ctx);
8985
Die(ctx);
9086
}
9187

9288
void TDynamicNodeResolverBase::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx)
9389
{
9490
if (ev->Get()->Status != NKikimrProto::OK)
95-
ReplyWithErrorAndDie(ctx);
91+
ReplyWithErrorAndDie("Pipe failed to connect", ctx);
92+
}
93+
94+
void TDynamicNodeResolverBase::HandleWakeup(const TActorContext &ctx) {
95+
ReplyWithErrorAndDie("Deadline exceeded", ctx);
96+
}
97+
98+
void TDynamicNodeResolverBase::HandleClientDestroyed(const TActorContext &ctx) {
99+
ReplyWithErrorAndDie("Pipe was destroyed", ctx);
100+
}
101+
102+
void TDynamicNodeResolverBase::OpenPipe(const TActorContext &ctx) {
103+
if (!Config->NodeBrokerPipe) {
104+
auto pipe = NTabletPipe::CreateClient(Owner, MakeNodeBrokerID());
105+
Config->NodeBrokerPipe = ctx.RegisterWithSameMailbox(pipe);
106+
}
96107
}
97108

98109
void TDynamicNodeResolver::OnSuccess(const TActorContext &ctx)
@@ -180,19 +191,19 @@ void TDynamicNameserver::ReplaceNameserverSetup(TIntrusivePtr<TTableNameserverSe
180191

181192
void TDynamicNameserver::Die(const TActorContext &ctx)
182193
{
183-
for (auto &pipe : NodeBrokerPipes) {
184-
if (pipe)
185-
NTabletPipe::CloseClient(ctx, pipe);
194+
for (auto &config : DynamicConfigs) {
195+
if (config->NodeBrokerPipe)
196+
NTabletPipe::CloseClient(ctx, config->NodeBrokerPipe);
186197
}
187198
TBase::Die(ctx);
188199
}
189200

190201
void TDynamicNameserver::OpenPipe(ui32 domain,
191202
const TActorContext &ctx)
192203
{
193-
if (!NodeBrokerPipes[domain]) {
204+
if (!DynamicConfigs[domain]->NodeBrokerPipe) {
194205
auto pipe = NTabletPipe::CreateClient(ctx.SelfID, MakeNodeBrokerID());
195-
NodeBrokerPipes[domain] = ctx.RegisterWithSameMailbox(pipe);
206+
DynamicConfigs[domain]->NodeBrokerPipe = ctx.RegisterWithSameMailbox(pipe);
196207
}
197208
}
198209

@@ -204,7 +215,7 @@ void TDynamicNameserver::RequestEpochUpdate(ui32 domain,
204215

205216
TAutoPtr<TEvNodeBroker::TEvListNodes> request = new TEvNodeBroker::TEvListNodes;
206217
request->Record.SetMinEpoch(epoch);
207-
NTabletPipe::SendData(ctx, NodeBrokerPipes[domain], request.Release());
218+
NTabletPipe::SendData(ctx, DynamicConfigs[domain]->NodeBrokerPipe, request.Release());
208219
EpochUpdates[domain] = epoch;
209220
}
210221

@@ -346,22 +357,29 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
346357
}
347358
}
348359

349-
void TDynamicNameserver::OnPipeDestroyed(ui32 domain,
350-
const TActorContext &ctx)
360+
template<typename TEv>
361+
void TDynamicNameserver::OnPipeDestroyed(TAutoPtr<TEventHandle<TEv>> &ev, ui32 domain, const TActorContext &ctx)
351362
{
352-
NodeBrokerPipes[domain] = TActorId();
363+
DynamicConfigs[domain]->NodeBrokerPipe = TActorId();
353364
PendingRequestAnswered(domain, ctx);
354365

355366
if (EpochUpdates.contains(domain)) {
356367
ctx.Schedule(TDuration::Seconds(1),
357368
new TEvPrivate::TEvUpdateEpoch(domain, EpochUpdates.at(domain)));
358369
EpochUpdates.erase(domain);
359370
}
371+
372+
for (const auto &cacheMiss : DynamicConfigs[domain]->PendingCacheMisses.Container()) {
373+
Send(cacheMiss.RequestActor, new TEv(*ev->Get()));
374+
}
375+
DynamicConfigs[domain]->PendingCacheMisses.clear();
360376
}
361377

362378
void TDynamicNameserver::Handle(TEvInterconnect::TEvResolveNode::TPtr &ev,
363379
const TActorContext &ctx)
364380
{
381+
LOG_D("Handle TEvInterconnect::TEvResolveNode(id=" << ev->Get()->Record.GetNodeId() << ")");
382+
365383
auto& record = ev->Get()->Record;
366384
const ui32 nodeId = record.GetNodeId();
367385
const TInstant deadline = record.HasDeadline() ? TInstant::FromValue(record.GetDeadline()) : TInstant::Max();
@@ -391,7 +409,7 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvListNodes::TPtr &ev,
391409
OpenPipe(domain, ctx);
392410
TAutoPtr<TEvNodeBroker::TEvListNodes> request = new TEvNodeBroker::TEvListNodes;
393411
request->Record.SetCachedVersion(DynamicConfigs[domain]->Epoch.Version);
394-
NTabletPipe::SendData(ctx, NodeBrokerPipes[domain], request.Release());
412+
NTabletPipe::SendData(ctx, DynamicConfigs[domain]->NodeBrokerPipe, request.Release());
395413
PendingRequests.Set(domain);
396414
}
397415
}
@@ -403,6 +421,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvListNodes::TPtr &ev,
403421

404422
void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TActorContext &ctx)
405423
{
424+
LOG_D("Handle TEvInterconnect::TEvGetNode(id=" << ev->Get()->NodeId << ")");
425+
406426
ui32 nodeId = ev->Get()->NodeId;
407427
THolder<TEvInterconnect::TEvNodeInfo> reply(new TEvInterconnect::TEvNodeInfo(nodeId));
408428
auto config = AppData(ctx)->DynamicNameserviceConfig;
@@ -435,18 +455,21 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
435455

436456
void TDynamicNameserver::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx)
437457
{
458+
LOG_D("Handle TEvTabletPipe::TEvClientDestroyed");
438459
ui32 domain = AppData()->DomainsInfo->GetDomain()->DomainUid;
439-
if (NodeBrokerPipes[domain] == ev->Get()->ClientId)
440-
OnPipeDestroyed(domain, ctx);
460+
if (DynamicConfigs[domain]->NodeBrokerPipe == ev->Get()->ClientId) {
461+
OnPipeDestroyed(ev, domain, ctx);
462+
}
441463
}
442464

443465
void TDynamicNameserver::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx)
444466
{
467+
LOG_D("Handle TEvTabletPipe::TEvClientConnected(status=" << ev->Get()->Status << ")");
445468
if (ev->Get()->Status != NKikimrProto::OK) {
446469
ui32 domain = AppData(ctx)->DomainsInfo->GetDomain()->DomainUid;
447-
if (NodeBrokerPipes[domain] == ev->Get()->ClientId) {
448-
NTabletPipe::CloseClient(ctx, NodeBrokerPipes[domain]);
449-
OnPipeDestroyed(domain, ctx);
470+
if (DynamicConfigs[domain]->NodeBrokerPipe == ev->Get()->ClientId) {
471+
NTabletPipe::CloseClient(ctx, DynamicConfigs[domain]->NodeBrokerPipe);
472+
OnPipeDestroyed(ev, domain, ctx);
450473
}
451474
}
452475
}
@@ -495,6 +518,26 @@ void TDynamicNameserver::Handle(TEvents::TEvUnsubscribe::TPtr ev) {
495518
StaticNodeChangeSubscribers.erase(ev->Sender);
496519
}
497520

521+
void TDynamicNameserver::HandleWakeup(const TActorContext &ctx) {
522+
auto now = ctx.Now();
523+
LOG_D("HandleWakeup at " << now);
524+
525+
ui32 domain = AppData()->DomainsInfo->GetDomain()->DomainUid;
526+
auto &pendingCacheMisses = DynamicConfigs[domain]->PendingCacheMisses;
527+
528+
while (!pendingCacheMisses.empty() && pendingCacheMisses.top().Deadline <= now) {
529+
const auto &top = pendingCacheMisses.top();
530+
Send(top.RequestActor, new TEvents::TEvWakeup);
531+
pendingCacheMisses.pop();
532+
}
533+
534+
if (!pendingCacheMisses.empty() && pendingCacheMisses.top().Deadline != TInstant::Max()) {
535+
auto deadline = pendingCacheMisses.top().Deadline;
536+
LOG_D("Schedule next wakeup at " << deadline);
537+
Schedule(deadline, new TEvents::TEvWakeup);
538+
}
539+
}
540+
498541
IActor *CreateDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 poolId) {
499542
return new TDynamicNameserver(setup, poolId);
500543
}

0 commit comments

Comments
 (0)