File tree Expand file tree Collapse file tree 2 files changed +11
-3
lines changed
ydb/core/fq/libs/row_dispatcher Expand file tree Collapse file tree 2 files changed +11
-3
lines changed Original file line number Diff line number Diff line change @@ -485,9 +485,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&)
485485 if (!info.NextMessageOffset ) {
486486 continue ;
487487 }
488- if (*info.NextMessageOffset <= info.LastSendedNextMessageOffset ) {
489- continue ;
490- }
491488 auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
492489 event->Record .SetPartitionId (PartitionId);
493490 event->Record .SetNextMessageOffset (*info.NextMessageOffset );
Original file line number Diff line number Diff line change @@ -161,6 +161,16 @@ class TFixture : public NUnitTest::TBaseFixture {
161161 return eventHolder->Get ()->Record .MessagesSize ();
162162 }
163163
164+ void ExpectStatisticToReadActor (TSet<NActors::TActorId> readActorIds) {
165+ size_t count = readActorIds.size ();
166+ for (size_t i = 0 ; i < count; ++i) {
167+ auto eventHolder = Runtime.GrabEdgeEvent <TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds (GrabTimeoutSec));
168+ UNIT_ASSERT (eventHolder.Get () != nullptr );
169+ UNIT_ASSERT (readActorIds.contains (eventHolder->Get ()->ReadActorId ));
170+ readActorIds.erase (eventHolder->Get ()->ReadActorId );
171+ }
172+ }
173+
164174 IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
165175 NActors::TTestActorRuntime Runtime;
166176 TActorSystemStub ActorSystemStub;
@@ -194,6 +204,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
194204 ExpectNewDataArrived ({ReadActorId1, ReadActorId2});
195205 ExpectMessageBatch (ReadActorId1, { Json1 });
196206 ExpectMessageBatch (ReadActorId2, { Json1 });
207+ ExpectStatisticToReadActor ({ReadActorId1, ReadActorId2});
197208
198209 auto source2 = BuildSource (topicName, false , " OtherConsumer" );
199210 StartSession (ReadActorId3, source2);
You can’t perform that action at this time.
0 commit comments