@@ -23,20 +23,23 @@ namespace {
2323// //////////////////////////////////////////////////////////////////////////////
2424
2525struct TTopicSessionMetrics {
26- void Init (const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, ui32 partitionId) {
26+ void Init (const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroup, ui32 partitionId) {
2727 TopicGroup = counters->GetSubgroup (" topic" , SanitizeLabel (topicPath));
28- AllSessionsDataRate = counters->GetCounter (" AllSessionsDataRate" , true );
28+ ReadGroup = TopicGroup->GetSubgroup (" read_group" , SanitizeLabel (readGroup));
29+ PartitionGroup = ReadGroup->GetSubgroup (" partition" , ToString (partitionId));
2930
30- PartitionGroup = TopicGroup-> GetSubgroup ( " partition " , ToString (partitionId) );
31+ AllSessionsDataRate = ReadGroup-> GetCounter ( " AllSessionsDataRate " , true );
3132 InFlyAsyncInputData = PartitionGroup->GetCounter (" InFlyAsyncInputData" );
3233 InFlySubscribe = PartitionGroup->GetCounter (" InFlySubscribe" );
3334 ReconnectRate = PartitionGroup->GetCounter (" ReconnectRate" , true );
34- RestartSessionByOffsets = counters ->GetCounter (" RestartSessionByOffsets" , true );
35+ RestartSessionByOffsets = PartitionGroup ->GetCounter (" RestartSessionByOffsets" , true );
3536 SessionDataRate = PartitionGroup->GetCounter (" SessionDataRate" , true );
3637 WaitEventTimeMs = PartitionGroup->GetHistogram (" WaitEventTimeMs" , NMonitoring::ExponentialHistogram (13 , 2 , 1 )); // ~ 1ms -> ~ 8s
38+ UnreadBytes = PartitionGroup->GetCounter (" UnreadBytes" );
3739 }
3840
3941 ::NMonitoring::TDynamicCounterPtr TopicGroup;
42+ ::NMonitoring::TDynamicCounterPtr ReadGroup;
4043 ::NMonitoring::TDynamicCounterPtr PartitionGroup;
4144 ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
4245 ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
@@ -45,6 +48,7 @@ struct TTopicSessionMetrics {
4548 ::NMonitoring::TDynamicCounters::TCounterPtr SessionDataRate;
4649 ::NMonitoring::THistogramPtr WaitEventTimeMs;
4750 ::NMonitoring::TDynamicCounters::TCounterPtr AllSessionsDataRate;
51+ ::NMonitoring::TDynamicCounters::TCounterPtr UnreadBytes;
4852};
4953
5054struct TEvPrivate {
@@ -106,9 +110,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
106110 }
107111 Y_UNUSED (TDuration::TryParse (Settings.GetSource ().GetReconnectPeriod (), ReconnectPeriod));
108112 auto queryGroup = Counters->GetSubgroup (" query_id" , ev->Get ()->Record .GetQueryId ());
109- auto topicGroup = queryGroup->GetSubgroup (" read_group" , SanitizeLabel (readGroup));
110- FilteredDataRate = topicGroup ->GetCounter (" FilteredDataRate" , true );
111- RestartSessionByOffsetsByQuery = counters ->GetCounter (" RestartSessionByOffsetsByQuery" , true );
113+ auto readSubGroup = queryGroup->GetSubgroup (" read_group" , SanitizeLabel (readGroup));
114+ FilteredDataRate = readSubGroup ->GetCounter (" FilteredDataRate" , true );
115+ RestartSessionByOffsetsByQuery = readSubGroup ->GetCounter (" RestartSessionByOffsetsByQuery" , true );
112116 }
113117
114118 ~TClientsInfo () {
@@ -162,6 +166,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
162166 UnreadBytes += rowSize;
163167 Self.UnreadBytes += rowSize;
164168 Self.SendDataArrived (*this );
169+ Self.Metrics .UnreadBytes ->Add (rowSize);
165170 }
166171
167172 void UpdateClientOffset (ui64 offset) override {
@@ -305,6 +310,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
305310 void SendStatisticToRowDispatcher ();
306311 void SendSessionError (TActorId readActorId, TStatus status);
307312 bool CheckNewClient (NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
313+ void RestartSessionIfOldestClient (const TClientsInfo& info);
308314
309315private:
310316
@@ -369,7 +375,7 @@ TTopicSession::TTopicSession(
369375
370376void TTopicSession::Bootstrap () {
371377 Become (&TTopicSession::StateFunc);
372- Metrics.Init (Counters, TopicPath, PartitionId);
378+ Metrics.Init (Counters, TopicPath, ReadGroup, PartitionId);
373379 LogPrefix = LogPrefix + " " + SelfId ().ToString () + " " ;
374380 LOG_ROW_DISPATCHER_DEBUG (" Bootstrap " << TopicPathPartition
375381 << " , Timeout " << Config.GetTimeoutBeforeStartSessionSec () << " sec, StatusPeriod " << Config.GetSendStatusPeriodSec () << " sec" );
@@ -683,6 +689,7 @@ void TTopicSession::SendData(TClientsInfo& info) {
683689 } while (!buffer.empty ());
684690
685691 UnreadBytes -= info.UnreadBytes ;
692+ Metrics.UnreadBytes ->Sub (info.UnreadBytes );
686693 info.UnreadRows = 0 ;
687694 info.UnreadBytes = 0 ;
688695
@@ -744,11 +751,14 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
744751
745752 auto it = Clients.find (ev->Sender );
746753 if (it == Clients.end ()) {
747- LOG_ROW_DISPATCHER_DEBUG ( " Wrong ClientSettings " );
754+ LOG_ROW_DISPATCHER_WARN ( " Ignore TEvStopSession from " << ev-> Sender << " , no client " );
748755 return ;
749756 }
750757 auto & info = *it->second ;
758+ RestartSessionIfOldestClient (info);
759+
751760 UnreadBytes -= info.UnreadBytes ;
761+ Metrics.UnreadBytes ->Sub (info.UnreadBytes );
752762 if (const auto formatIt = FormatHandlers.find (info.HandlerSettings ); formatIt != FormatHandlers.end ()) {
753763 formatIt->second ->RemoveClient (info.GetClientId ());
754764 if (!formatIt->second ->HasClients ()) {
@@ -762,6 +772,41 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
762772 SubscribeOnNextEvent ();
763773}
764774
775+ void TTopicSession::RestartSessionIfOldestClient (const TClientsInfo& info) {
776+ // if we read historical data (because of this client), then we restart the session.
777+
778+ if (!ReadSession || !info.NextMessageOffset ) {
779+ return ;
780+ }
781+ TMaybe<ui64> minMessageOffset;
782+ for (auto & [readActorId, clientPtr] : Clients) {
783+ if (info.ReadActorId == readActorId || !clientPtr->NextMessageOffset ) {
784+ continue ;
785+ }
786+ if (!minMessageOffset) {
787+ minMessageOffset = clientPtr->NextMessageOffset ;
788+ continue ;
789+ }
790+ minMessageOffset = std::min (minMessageOffset, clientPtr->NextMessageOffset );
791+ }
792+ if (!minMessageOffset) {
793+ return ;
794+ }
795+
796+ if (info.NextMessageOffset >= minMessageOffset) {
797+ return ;
798+ }
799+ LOG_ROW_DISPATCHER_INFO (" Client (on StopSession) has less offset (" << info.NextMessageOffset << " ) than others clients (" << minMessageOffset << " ), stop (restart) topic session" );
800+ Metrics.RestartSessionByOffsets ->Inc ();
801+ ++RestartSessionByOffsets;
802+ info.RestartSessionByOffsetsByQuery ->Inc ();
803+ StopReadSession ();
804+
805+ if (!ReadSession) {
806+ Schedule (TDuration::Seconds (Config.GetTimeoutBeforeStartSessionSec ()), new NFq::TEvPrivate::TEvCreateSession ());
807+ }
808+ }
809+
765810void TTopicSession::FatalError (TStatus status) {
766811 LOG_ROW_DISPATCHER_ERROR (" FatalError: " << status.GetErrorMessage ());
767812
0 commit comments