Skip to content

Commit 9367424

Browse files
authored
YQ-3909 Shared reading: fix Ingress statistics (#11943)
1 parent dec33cc commit 9367424

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,9 +487,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&)
487487
if (!info.NextMessageOffset) {
488488
continue;
489489
}
490-
if (*info.NextMessageOffset <= info.LastSendedNextMessageOffset) {
491-
continue;
492-
}
493490
auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
494491
event->Record.SetPartitionId(PartitionId);
495492
event->Record.SetNextMessageOffset(*info.NextMessageOffset);

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,16 @@ class TFixture : public NUnitTest::TBaseFixture {
161161
return eventHolder->Get()->Record.MessagesSize();
162162
}
163163

164+
void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds) {
165+
size_t count = readActorIds.size();
166+
for (size_t i = 0; i < count; ++i) {
167+
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
168+
UNIT_ASSERT(eventHolder.Get() != nullptr);
169+
UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId));
170+
readActorIds.erase(eventHolder->Get()->ReadActorId);
171+
}
172+
}
173+
164174
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
165175
NActors::TTestActorRuntime Runtime;
166176
TActorSystemStub ActorSystemStub;
@@ -194,6 +204,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
194204
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
195205
ExpectMessageBatch(ReadActorId1, { Json1 });
196206
ExpectMessageBatch(ReadActorId2, { Json1 });
207+
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2});
197208

198209
auto source2 = BuildSource(topicName, false, "OtherConsumer");
199210
StartSession(ReadActorId3, source2);

0 commit comments

Comments
 (0)