@@ -162,9 +162,10 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
162162
163163 ESessionStatus Status = ESessionStatus::NoSession;
164164 ui64 NextOffset = 0 ;
165- bool IsWaitingRowDispatcherResponse = false ;
165+ bool IsWaitingStartSessionAck = false ;
166166 NYql::NDq::TRetryEventsQueue EventsQueue;
167167 bool HasPendingData = false ;
168+ bool IsWaitingMessageBatch = false ;
168169 TActorId RowDispatcherActorId;
169170 ui64 PartitionId;
170171 ui64 Generation;
@@ -276,12 +277,14 @@ TDqPqRdReadActor::TDqPqRdReadActor(
276277 }
277278
278279 IngressStats.Level = statsLevel;
279- SRC_LOG_D (" Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString () << " , metadatafields: " << JoinSeq (' ,' , SourceParams.GetMetadataFields ()));
280+ SRC_LOG_I (" Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString () << " , metadatafields: " << JoinSeq (' ,' , SourceParams.GetMetadataFields ()));
280281}
281282
282283void TDqPqRdReadActor::ProcessState () {
283284 switch (State) {
284285 case EState::INIT:
286+ LogPrefix = (TStringBuilder () << " SelfId: " << SelfId () << " , TxId: " << TxId << " , task: " << TaskId << " . PQ source. " );
287+
285288 if (!ReadyBuffer.empty ()) {
286289 return ;
287290 }
@@ -290,7 +293,7 @@ void TDqPqRdReadActor::ProcessState() {
290293 Schedule (TDuration::Seconds (ProcessStatePeriodSec), new TEvPrivate::TEvProcessState ());
291294 }
292295 if (!CoordinatorActorId) {
293- SRC_LOG_D (" Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId ());
296+ SRC_LOG_I (" Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId ());
294297 Send (LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe ());
295298 if (!SchedulePrintStatePeriod) {
296299 SchedulePrintStatePeriod = true ;
@@ -305,7 +308,7 @@ void TDqPqRdReadActor::ProcessState() {
305308 }
306309 State = EState::WAIT_PARTITIONS_ADDRES;
307310 auto partitionToRead = GetPartitionsToRead ();
308- SRC_LOG_D (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: " << JoinSeq (" , " , partitionToRead));
311+ SRC_LOG_I (" Send TEvCoordinatorRequest to coordinator " << CoordinatorActorId->ToString () << " , partIds: " << JoinSeq (" , " , partitionToRead));
309312 Send (
310313 *CoordinatorActorId,
311314 new NFq::TEvRowDispatcher::TEvCoordinatorRequest (SourceParams, partitionToRead),
@@ -328,7 +331,7 @@ void TDqPqRdReadActor::ProcessState() {
328331 readOffset = offsetIt->second ;
329332 }
330333
331- SRC_LOG_D (" Send TEvStartSession to " << sessionInfo.RowDispatcherActorId
334+ SRC_LOG_I (" Send TEvStartSession to " << sessionInfo.RowDispatcherActorId
332335 << " , offset " << readOffset
333336 << " , partitionId " << partitionId
334337 << " , connection id " << sessionInfo.Generation );
@@ -341,7 +344,7 @@ void TDqPqRdReadActor::ProcessState() {
341344 StartingMessageTimestamp.MilliSeconds (),
342345 std::visit ([](auto arg) { return ToString (arg); }, TxId));
343346 sessionInfo.EventsQueue .Send (event, sessionInfo.Generation );
344- sessionInfo.IsWaitingRowDispatcherResponse = true ;
347+ sessionInfo.IsWaitingStartSessionAck = true ;
345348 sessionInfo.Status = SessionInfo::ESessionStatus::Started;
346349 }
347350 }
@@ -365,14 +368,14 @@ void TDqPqRdReadActor::StopSessions() {
365368 auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
366369 *event->Record .MutableSource () = SourceParams;
367370 event->Record .SetPartitionId (partitionId);
368- SRC_LOG_D (" Send StopSession to " << sessionInfo.RowDispatcherActorId );
371+ SRC_LOG_I (" Send StopSession to " << sessionInfo.RowDispatcherActorId );
369372 sessionInfo.EventsQueue .Send (event.release (), sessionInfo.Generation );
370373 }
371374}
372375
373376// IActor & IDqComputeActorAsyncInput
374377void TDqPqRdReadActor::PassAway () { // Is called from Compute Actor
375- SRC_LOG_D (" PassAway" );
378+ SRC_LOG_I (" PassAway" );
376379 PrintInternalState ();
377380 StopSessions ();
378381 TActor<TDqPqRdReadActor>::PassAway ();
@@ -430,7 +433,7 @@ std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const {
430433
431434void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& ev) {
432435 const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get ()->Record .GetTransportMeta ();
433- SRC_LOG_D (" TEvStartSessionAck from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
436+ SRC_LOG_I (" TEvStartSessionAck from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
434437
435438 ui64 partitionId = ev->Get ()->Record .GetConsumer ().GetPartitionId ();
436439 auto sessionIt = Sessions.find (partitionId);
@@ -444,11 +447,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
444447 if (!CheckSession (sessionInfo, ev, partitionId)) {
445448 return ;
446449 }
450+ sessionInfo.IsWaitingStartSessionAck = false ;
447451}
448452
449453void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
450454 const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get ()->Record .GetTransportMeta ();
451- SRC_LOG_D (" TEvSessionError from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
455+ SRC_LOG_I (" TEvSessionError from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
452456
453457 ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
454458 auto sessionIt = Sessions.find (partitionId);
@@ -581,6 +585,7 @@ void TDqPqRdReadActor::ReInit(const TString& reason) {
581585 if (!ReadyBuffer.empty ()) {
582586 Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
583587 }
588+ PrintInternalState ();
584589}
585590
586591void TDqPqRdReadActor::Stop (const TString& message) {
@@ -674,6 +679,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
674679 activeBatch.UsedSpace = bytes;
675680 ReadyBufferSizeBytes += bytes;
676681 activeBatch.NextOffset = ev->Get ()->Record .GetNextMessageOffset ();
682+ sessionInfo.IsWaitingMessageBatch = false ;
677683 Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
678684}
679685
@@ -716,7 +722,8 @@ void TDqPqRdReadActor::PrintInternalState() {
716722 str << " State:\n " ;
717723 for (auto & [partitionId, sessionInfo] : Sessions) {
718724 str << " partId " << partitionId << " status " << static_cast <ui64>(sessionInfo.Status )
719- << " next offset " << sessionInfo.NextOffset << " is waiting " << sessionInfo.IsWaitingRowDispatcherResponse
725+ << " next offset " << sessionInfo.NextOffset << " used buffer size " << ReadyBufferSizeBytes
726+ << " is waiting ack " << sessionInfo.IsWaitingStartSessionAck << " is waiting batch " << sessionInfo.IsWaitingMessageBatch
720727 << " has pending data " << sessionInfo.HasPendingData << " connection id " << sessionInfo.Generation << " " ;
721728 sessionInfo.EventsQueue .PrintInternalState (str);
722729 }
@@ -738,6 +745,7 @@ void TDqPqRdReadActor::TrySendGetNextBatch(SessionInfo& sessionInfo) {
738745 Metrics.InFlyGetNextBatch ->Inc ();
739746 auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
740747 sessionInfo.HasPendingData = false ;
748+ sessionInfo.IsWaitingMessageBatch = true ;
741749 event->Record .SetPartitionId (sessionInfo.PartitionId );
742750 sessionInfo.EventsQueue .Send (event.release (), sessionInfo.Generation );
743751}
0 commit comments