@@ -92,6 +92,7 @@ struct TEvPrivate {
9292
9393ui64 PrintStatePeriodSec = 60 ;
9494ui64 MaxBatchSizeBytes = 10000000 ;
95+ ui64 MaxHandledEvents = 1000 ;
9596
9697TVector<TString> GetVector (const google::protobuf::RepeatedPtrField<TString>& value) {
9798 return {value.begin (), value.end ()};
@@ -227,6 +228,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
227228 cFunc (NActors::TEvents::TEvPoisonPill::EventType, PassAway);
228229 IgnoreFunc (NFq::TEvPrivate::TEvPqEventsReady);
229230 IgnoreFunc (NFq::TEvPrivate::TEvCreateSession);
231+ IgnoreFunc (NFq::TEvPrivate::TEvDataParsed);
230232 IgnoreFunc (NFq::TEvPrivate::TEvDataAfterFilteration);
231233 IgnoreFunc (NFq::TEvPrivate::TEvStatus);
232234 IgnoreFunc (NFq::TEvPrivate::TEvDataFiltered);
@@ -442,7 +444,7 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
442444}
443445
444446void TTopicSession::HandleNewEvents () {
445- while ( true ) {
447+ for (ui64 i = 0 ; i < MaxHandledEvents; ++i ) {
446448 if (!ReadSession) {
447449 return ;
448450 }
@@ -475,7 +477,6 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
475477 LOG_ROW_DISPATCHER_TRACE (" Data received: " << message.DebugString (true ));
476478
477479 TString item = message.GetData ();
478- item.Detach ();
479480 Self.SendToParsing (message.GetOffset (), item);
480481 Self.LastMessageOffset = message.GetOffset ();
481482 }
@@ -540,6 +541,10 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) {
540541 }
541542 }
542543
544+ if (ClientsWithoutPredicate.size () == Clients.size ()) {
545+ return ;
546+ }
547+
543548 try {
544549 Parser->Push (offset, message);
545550 } catch (const std::exception& e) {
0 commit comments