Skip to content

Commit 347ec7d

Browse files
authored
Merge 45034bb into 7f1f1ae
2 parents 7f1f1ae + 45034bb commit 347ec7d

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ void TDqPqRdReadActor::ProcessGlobalState() {
390390
if (!CoordinatorActorId) {
391391
SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId());
392392
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
393-
State = EState::WAIT_COORDINATOR_ID;
393+
State = EState::WAIT_COORDINATOR_ID;
394394
}
395395
[[fallthrough]];
396396
case EState::WAIT_COORDINATOR_ID: {
@@ -664,7 +664,7 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartb
664664
bool needSend = sessionInfo.EventsQueue.Heartbeat();
665665
if (needSend) {
666666
SRC_LOG_T("Send TEvEvHeartbeat");
667-
Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(), sessionInfo.Generation);
667+
Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(), IEventHandle::FlagTrackDelivery, sessionInfo.Generation);
668668
}
669669
}
670670

@@ -764,7 +764,7 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::
764764
}
765765

766766
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
767-
SRC_LOG_D("Received TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << ", reason " << ev->Get()->Reason);
767+
SRC_LOG_D("Received TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << ", reason " << ev->Get()->Reason << ", cookie " << ev->Cookie);
768768
Counters.Undelivered++;
769769

770770
auto sessionIt = Sessions.find(ev->Sender);

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ void TDqPqReadActorBase::SaveState(const NDqProto::TCheckpoint& /*checkpoint*/,
2929
topic->SetDatabase(SourceParams.GetDatabase());
3030
topic->SetTopicPath(SourceParams.GetTopicPath());
3131

32+
TStringStream str;
33+
str << "SessionId: " << GetSessionId() << " SaveState, offsets: ";
3234
for (const auto& [clusterAndPartition, offset] : PartitionToOffset) {
3335
const auto& [cluster, partition] = clusterAndPartition;
3436
NPq::NProto::TDqPqTopicSourceState::TPartitionReadState* partitionState = stateProto.AddPartitions();
3537
partitionState->SetTopicIndex(0); // Now we are supporting only one topic per source.
3638
partitionState->SetCluster(cluster);
3739
partitionState->SetPartition(partition);
3840
partitionState->SetOffset(offset);
39-
SRC_LOG_D("SessionId: " << GetSessionId() << " SaveState: partition " << partition << ", offset: " << offset);
41+
str << "{" << partition << "," << offset << "},";
4042
}
43+
SRC_LOG_D(str.Str());
4144

4245
stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds());
4346
stateProto.SetIngressBytes(IngressStats.Bytes);

0 commit comments

Comments
 (0)