Skip to content

Commit e03ce2a

Browse files
authored
Merge 932f91c into fa450dd
2 parents fa450dd + 932f91c commit e03ce2a

File tree

3 files changed

+21
-21
lines changed

3 files changed

+21
-21
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3682,12 +3682,13 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& e
36823682
auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId);
36833683
Y_VERIFY_S(it != CoordinatorSubscriptionById.end(),
36843684
"Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId);
3685-
size_t index = it->second;
3686-
auto& subscription = CoordinatorSubscriptions.at(index);
3687-
subscription.ReadStep = msg->ReadStep;
36883685
CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep);
36893686
CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep);
36903687
--CoordinatorSubscriptionsPending;
3688+
3689+
// Note: we don't use the subscription and unsubscribe immediately
3690+
ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep(msg->CoordinatorId));
3691+
36913692
CheckMediatorStateRestored();
36923693
}
36933694

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2703,7 +2703,6 @@ class TDataShard
27032703

27042704
struct TCoordinatorSubscription {
27052705
ui64 CoordinatorId;
2706-
TMediatorTimecastReadStep::TCPtr ReadStep;
27072706
};
27082707

27092708
TVector<TCoordinatorSubscription> CoordinatorSubscriptions;

ydb/core/tx/time_cast/time_cast.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
namespace NKikimr {
1919

20-
// We will unsubscribe from idle coordinators after 5 minutes
21-
static constexpr TDuration MaxIdleCoordinatorSubscriptionTime = TDuration::Minutes(5);
22-
2320
ui64 TMediatorTimecastEntry::Get(ui64 tabletId) const {
2421
Y_UNUSED(tabletId);
2522
return AtomicGet(Step);
@@ -115,7 +112,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
115112
THashSet<TActorId> Subscribers;
116113
TMap<std::pair<ui64, TActorId>, ui64> SubscribeRequests; // (seqno, subscriber) -> Cookie
117114
TMap<std::pair<ui64, TActorId>, ui64> WaitRequests; // (step, subscriber) -> Cookie
118-
TMonotonic IdleStart;
115+
bool Unsubscribed = false;
119116
};
120117

121118
THashMap<ui64, TMediator> Mediators; // mediator tablet -> info
@@ -190,6 +187,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
190187

191188
void SyncCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
192189
void ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx);
190+
void UnsubscribeCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
193191
void NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx);
194192

195193
void Handle(TEvMediatorTimecast::TEvRegisterTablet::TPtr &ev, const TActorContext &ctx);
@@ -442,8 +440,9 @@ void TMediatorTimecastProxy::SyncCoordinator(ui64 coordinatorId, TCoordinator &c
442440
NTabletPipe::CreateClient(ctx.SelfID, coordinatorId, retryPolicy));
443441
}
444442

445-
coordinator.LastSentSeqNo = seqNo;
446443
NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvSubscribeReadStep(coordinatorId, seqNo));
444+
coordinator.LastSentSeqNo = seqNo;
445+
coordinator.Unsubscribed = false;
447446
}
448447

449448
void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorId &pipeClient, const TActorContext &ctx) {
@@ -468,6 +467,17 @@ void TMediatorTimecastProxy::ResyncCoordinator(ui64 coordinatorId, const TActorI
468467
SyncCoordinator(coordinatorId, coordinator, ctx);
469468
}
470469

470+
void TMediatorTimecastProxy::UnsubscribeCoordinator(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {
471+
if (coordinator.Unsubscribed || !coordinator.PipeClient) {
472+
return;
473+
}
474+
475+
// Note: we leave the pipe open to make new subscriptions faster
476+
// The idle entry will be removed when the pipe eventually disconnects
477+
NTabletPipe::SendData(ctx, coordinator.PipeClient, new TEvTxProxy::TEvUnsubscribeReadStep(coordinatorId, coordinator.LastSentSeqNo));
478+
coordinator.Unsubscribed = true;
479+
}
480+
471481
void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx) {
472482
const auto *msg = ev->Get();
473483
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID
@@ -496,7 +506,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep:
496506
auto &coordinator = Coordinators[coordinatorId];
497507
coordinator.Subscribers.erase(ev->Sender);
498508
if (coordinator.Subscribers.empty()) {
499-
coordinator.IdleStart = ctx.Monotonic();
509+
UnsubscribeCoordinator(coordinatorId, coordinator, ctx);
500510
}
501511
}
502512
subscriber.Coordinators.clear();
@@ -505,7 +515,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUnsubscribeReadStep:
505515
auto &coordinator = Coordinators[msg->CoordinatorId];
506516
coordinator.Subscribers.erase(ev->Sender);
507517
if (coordinator.Subscribers.empty()) {
508-
coordinator.IdleStart = ctx.Monotonic();
518+
UnsubscribeCoordinator(msg->CoordinatorId, coordinator, ctx);
509519
}
510520
subscriber.Coordinators.erase(msg->CoordinatorId);
511521
}
@@ -614,16 +624,6 @@ void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepUpdate::TPtr
614624

615625
NotifyCoordinatorWaiters(coordinatorId, coordinator, ctx);
616626
}
617-
618-
// Unsubscribe from idle coordinators
619-
if (coordinator.Subscribers.empty() && (ctx.Monotonic() - coordinator.IdleStart) >= MaxIdleCoordinatorSubscriptionTime) {
620-
if (coordinator.PipeClient) {
621-
NTabletPipe::CloseClient(ctx, coordinator.PipeClient);
622-
coordinator.PipeClient = TActorId();
623-
}
624-
625-
Coordinators.erase(itCoordinator);
626-
}
627627
}
628628

629629
void TMediatorTimecastProxy::NotifyCoordinatorWaiters(ui64 coordinatorId, TCoordinator &coordinator, const TActorContext &ctx) {

0 commit comments

Comments
 (0)