Skip to content

Commit 0da0d6a

Browse files
authored
YQ-3583 Improvements after load tests / to stable (#10019)
1 parent afee6e2 commit 0da0d6a

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ struct TEvPrivate {
9292

9393
ui64 PrintStatePeriodSec = 60;
9494
ui64 MaxBatchSizeBytes = 10000000;
95+
ui64 MaxHandledEvents = 1000;
9596

9697
TVector<TString> GetVector(const google::protobuf::RepeatedPtrField<TString>& value) {
9798
return {value.begin(), value.end()};
@@ -227,6 +228,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
227228
cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway);
228229
IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady);
229230
IgnoreFunc(NFq::TEvPrivate::TEvCreateSession);
231+
IgnoreFunc(NFq::TEvPrivate::TEvDataParsed);
230232
IgnoreFunc(NFq::TEvPrivate::TEvDataAfterFilteration);
231233
IgnoreFunc(NFq::TEvPrivate::TEvStatus);
232234
IgnoreFunc(NFq::TEvPrivate::TEvDataFiltered);
@@ -442,7 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
442444
}
443445

444446
void TTopicSession::HandleNewEvents() {
445-
while (true) {
447+
for (ui64 i = 0; i < MaxHandledEvents; ++i) {
446448
if (!ReadSession) {
447449
return;
448450
}
@@ -475,7 +477,6 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
475477
LOG_ROW_DISPATCHER_TRACE("Data received: " << message.DebugString(true));
476478

477479
TString item = message.GetData();
478-
item.Detach();
479480
Self.SendToParsing(message.GetOffset(), item);
480481
Self.LastMessageOffset = message.GetOffset();
481482
}
@@ -540,6 +541,10 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) {
540541
}
541542
}
542543

544+
if (ClientsWithoutPredicate.size() == Clients.size()) {
545+
return;
546+
}
547+
543548
try {
544549
Parser->Push(offset, message);
545550
} catch (const std::exception& e) {

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,27 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
171171
StopSession(ReadActorId2, source);
172172
}
173173

174+
Y_UNIT_TEST_F(TwoSessionWithoutPredicate, TFixture) {
175+
const TString topicName = "twowithoutpredicate";
176+
PQCreateStream(topicName);
177+
Init(topicName);
178+
auto source1 = BuildSource(topicName, true);
179+
auto source2 = BuildSource(topicName, true);
180+
StartSession(ReadActorId1, source1);
181+
StartSession(ReadActorId2, source2);
182+
183+
const std::vector<TString> data = { Json1 };
184+
PQWrite(data, topicName);
185+
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
186+
Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch()));
187+
Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch()));
188+
ExpectMessageBatch(ReadActorId1, { Json1 });
189+
ExpectMessageBatch(ReadActorId2, { Json1 });
190+
191+
StopSession(ReadActorId1, source1);
192+
StopSession(ReadActorId2, source2);
193+
}
194+
174195
Y_UNIT_TEST_F(SessionWithPredicateAndSessionWithoutPredicate, TFixture) {
175196
const TString topicName = "topic2";
176197
PQCreateStream(topicName);

0 commit comments

Comments
 (0)