Skip to content

Commit 599ae6f

Browse files
committed
Fix after review
1 parent 3c22222 commit 599ae6f

File tree

5 files changed

+26
-23
lines changed

5 files changed

+26
-23
lines changed

ydb/core/mind/node_broker.cpp

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,12 @@ void TNodeBroker::SubscribeForConfigUpdates(const TActorContext &ctx)
687687

688688
void TNodeBroker::SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, const TActorContext &ctx) const
689689
{
690-
THolder<IEventHandle> ev = MakeHolder<IEventHandle>(subscriber.Id, ctx.SelfID, event);
690+
SendToSubscriber(subscriber, event, 0, ctx);
691+
}
692+
693+
void TNodeBroker::SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, ui64 cookie, const TActorContext &ctx) const
694+
{
695+
THolder<IEventHandle> ev = MakeHolder<IEventHandle>(subscriber.Id, ctx.SelfID, event, 0, cookie);
691696
if (subscriber.PipeServerInfo->IcSession) {
692697
ev->Rewrite(TEvInterconnect::EvForward, subscriber.PipeServerInfo->IcSession);
693698
}
@@ -708,10 +713,6 @@ void TNodeBroker::SendUpdateNodes(const TActorContext &ctx)
708713

709714
void TNodeBroker::SendUpdateNodes(const TSubscriberInfo &subscriber, ui64 version, const TActorContext &ctx)
710715
{
711-
if (version >= Committed.Epoch.Version) {
712-
return;
713-
}
714-
715716
NKikimrNodeBroker::TUpdateNodes record;
716717
record.SetSeqNo(subscriber.SeqNo);
717718
Committed.Epoch.Serialize(*record.MutableEpoch());
@@ -743,15 +744,15 @@ TNodeBroker::TSubscriberInfo& TNodeBroker::AddSubscriber(TActorId subscriberId,
743744

744745
auto& pipeServer = PipeServers.at(pipeServerId);
745746
auto res = Subscribers.emplace(subscriberId, TSubscriberInfo(subscriberId, seqNo, &pipeServer));
746-
Y_VERIFY_DEBUG_S(res.second, "Subscription already exists for " << subscriberId);
747+
Y_ENSURE(res.second, "Subscription already exists for " << subscriberId);
747748
pipeServer.Subscribers.insert(subscriberId);
748749
return res.first->second;
749750
}
750751

751752
void TNodeBroker::RemoveSubscriber(TActorId subscriber, const TActorContext &ctx)
752753
{
753754
auto it = Subscribers.find(subscriber);
754-
Y_VERIFY_DEBUG_S(it != Subscribers.end(), "No subscription for " << subscriber);
755+
Y_ENSURE(it != Subscribers.end(), "No subscription for " << subscriber);
755756

756757
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
757758
"Unsubscribed " << subscriber
@@ -1622,26 +1623,25 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvSyncNodesRequest::TPtr &ev,
16221623
TabletCounters->Cumulative()[COUNTER_SYNC_NODES_REQUESTS].Increment(1);
16231624

16241625
if (auto it = Subscribers.find(ev->Sender); it != Subscribers.end()) {
1625-
auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
1626-
response->Record.SetSeqNo(it->second.SeqNo);
1627-
SendToSubscriber(it->second, response.Release(), ctx);
1628-
} else {
1629-
LOG_ERROR_S(ctx, NKikimrServices::NODE_BROKER,
1630-
"Unexpected TEvSyncNodesRequest without subscription from " << ev->Sender);
1626+
if (it->second.SeqNo == ev->Get()->Record.GetSeqNo()) {
1627+
auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
1628+
response->Record.SetSeqNo(it->second.SeqNo);
1629+
SendToSubscriber(it->second, response.Release(), ev->Cookie, ctx);
1630+
}
16311631
}
16321632
}
16331633

16341634
void TNodeBroker::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev)
16351635
{
16361636
auto res = PipeServers.emplace(ev->Get()->ServerId, TPipeServerInfo(ev->Get()->ServerId, ev->Get()->InterconnectSession));
1637-
Y_VERIFY_DEBUG_S(res.second, "Unexpected TEvServerConnected for " << ev->Get()->ServerId);
1637+
Y_ENSURE(res.second, "Unexpected TEvServerConnected for " << ev->Get()->ServerId);
16381638
}
16391639

16401640
void TNodeBroker::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev,
16411641
const TActorContext &ctx)
16421642
{
16431643
auto it = PipeServers.find(ev->Get()->ServerId);
1644-
Y_VERIFY_DEBUG_S(it != PipeServers.end(), "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId);
1644+
Y_ENSURE(it != PipeServers.end(), "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId);
16451645
while (!it->second.Subscribers.empty()) {
16461646
RemoveSubscriber(*it->second.Subscribers.begin(), ctx);
16471647
}

ydb/core/mind/node_broker__migrate_state.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ class TNodeBroker::TTxMigrateState : public TTransactionBase<TNodeBroker> {
102102
Self->ScheduleEpochUpdate(ctx);
103103
Self->PrepareEpochCache();
104104
Self->PrepareUpdateNodesLog();
105-
Self->SignalTabletActive(ctx, "1.0");
105+
106+
NKikimrNodeBroker::TVersionInfo versionInfo;
107+
versionInfo.SetSupportDeltaProtocol(true);
108+
Self->SignalTabletActive(ctx, versionInfo.SerializeAsString());
106109
} else {
107110
Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
108111
}

ydb/core/mind/node_broker_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ class TNodeBroker : public TActor<TNodeBroker>
286286
void SendUpdateNodes(const TActorContext &ctx);
287287
void SendUpdateNodes(const TSubscriberInfo &subscriber, ui64 version, const TActorContext &ctx);
288288
void SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, const TActorContext &ctx) const;
289+
void SendToSubscriber(const TSubscriberInfo &subscriber, IEventBase* event, ui64 cookie, const TActorContext &ctx) const;
289290

290291
TSubscriberInfo& AddSubscriber(TActorId subscriberId, TActorId pipeServerId, ui64 seqNo, const TActorContext &ctx);
291292
void RemoveSubscriber(TActorId subscriber, const TActorContext &ctx);

ydb/core/mind/node_broker_ut.cpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -842,13 +842,7 @@ void CheckUpdateNodesLog(TTestActorRuntime &runtime,
842842
static ui32 seqNo = 1;
843843
TActorId pipe = runtime.ConnectToPipe(MakeNodeBrokerID(), sender, 0, GetPipeConfigWithRetries());
844844
SubscribeToNodesUpdates(runtime, sender, pipe, version, ++seqNo);
845-
if (epoch.GetVersion() != version) {
846-
CheckNodesUpdate(runtime, updates, seqNo, epoch);
847-
} else {
848-
TBlockEvents<TEvNodeBroker::TEvUpdateNodes> block(runtime);
849-
runtime.SimulateSleep(TDuration::MilliSeconds(10));
850-
UNIT_ASSERT(block.empty());
851-
}
845+
CheckNodesUpdate(runtime, updates, seqNo, epoch);
852846
runtime.ClosePipe(pipe, sender, 0);
853847
}
854848

ydb/core/protos/node_broker.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ message TNodeInfoSchema {
3737
optional bool AuthorizedByCertificate = 10;
3838
}
3939

40+
message TVersionInfo {
41+
optional bool SupportDeltaProtocol = 1;
42+
}
43+
4044
message TEpoch {
4145
optional uint64 Id = 1;
4246
optional uint64 Version = 2;
@@ -164,6 +168,7 @@ message TUpdateNodes {
164168
}
165169

166170
message TSyncNodesRequest {
171+
optional uint64 SeqNo = 1;
167172
}
168173

169174
message TSyncNodesResponse {

0 commit comments

Comments
 (0)