@@ -91,24 +91,63 @@ struct TEvPrivate {
9191} // namespace
9292
9393class TDqPqWriteActor : public NActors ::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput {
94+ struct TMetrics {
95+ TMetrics (const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
96+ : TxId(std::visit([](auto arg) { return ToString (arg); }, txId))
97+ , Counters(counters) {
98+ SubGroup = Counters->GetSubgroup (" sink" , " PqSink" );
99+ auto sink = SubGroup->GetSubgroup (" tx_id" , TxId);
100+ auto task = sink->GetSubgroup (" task_id" , ToString (taskId));
101+ LastAckLatency = task->GetCounter (" LastAckLatencyMs" );
102+ InFlyCheckpoints = task->GetCounter (" InFlyCheckpoints" );
103+ InFlyData = task->GetCounter (" InFlyData" );
104+ AlreadyWritten = task->GetCounter (" AlreadWritten" );
105+ }
106+
107+ ~TMetrics () {
108+ SubGroup->RemoveSubgroup (" id" , TxId);
109+ }
110+
111+ TString TxId;
112+ ::NMonitoring::TDynamicCounterPtr Counters;
113+ ::NMonitoring::TDynamicCounterPtr SubGroup;
114+ ::NMonitoring::TDynamicCounters::TCounterPtr LastAckLatency;
115+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlyCheckpoints;
116+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlyData;
117+ ::NMonitoring::TDynamicCounters::TCounterPtr AlreadyWritten;
118+ };
119+
120+ struct TAckInfo {
121+ TAckInfo (i64 messageSize, const TInstant& startTime)
122+ : MessageSize(messageSize)
123+ , StartTime(startTime)
124+ {}
125+
126+ i64 MessageSize = 0 ;
127+ TInstant StartTime;
128+ };
129+
94130public:
95131 TDqPqWriteActor (
96132 ui64 outputIndex,
97133 TCollectStatsLevel statsLevel,
98134 const TTxId& txId,
135+ ui64 taskId,
99136 NPq::NProto::TDqPqTopicSink&& sinkParams,
100137 NYdb::TDriver driver,
101138 std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
102139 IDqComputeActorAsyncOutput::ICallbacks* callbacks,
140+ const ::NMonitoring::TDynamicCounterPtr& counters,
103141 i64 freeSpace)
104142 : TActor<TDqPqWriteActor>(&TDqPqWriteActor::StateFunc)
105143 , OutputIndex(outputIndex)
106144 , TxId(txId)
145+ , Metrics(txId, taskId, counters)
107146 , SinkParams(std::move(sinkParams))
108147 , Driver(std::move(driver))
109148 , CredentialsProviderFactory(credentialsProviderFactory)
110149 , Callbacks(callbacks)
111- , LogPrefix(TStringBuilder() << " SelfId: " << this ->SelfId () << ", TxId: " << TxId << ", PQ sink. ")
150+ , LogPrefix(TStringBuilder() << " SelfId: " << this ->SelfId () << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
112151 , FreeSpace(freeSpace)
113152 , TopicClient(Driver, GetTopicClientSettings())
114153 {
@@ -162,6 +201,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
162201 }
163202
164203 FreeSpace -= messageSize;
204+ Metrics.InFlyData ->Inc ();
165205 Buffer.push (std::move (data));
166206 return true ;
167207 })) {
@@ -170,9 +210,13 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
170210
171211 if (checkpoint) {
172212 if (Buffer.empty () && WaitingAcks.empty ()) {
213+ SINK_LOG_D (" Send checkpoint state immediately" );
173214 Callbacks->OnAsyncOutputStateSaved (BuildState (*checkpoint), OutputIndex, *checkpoint);
174215 } else {
175- DeferredCheckpoints.emplace (NextSeqNo + Buffer.size () - 1 , *checkpoint);
216+ ui64 seqNo = NextSeqNo + Buffer.size () - 1 ;
217+ SINK_LOG_D (" Defer sending the checkpoint, seqNo: " << seqNo);
218+ Metrics.InFlyCheckpoints ->Inc ();
219+ DeferredCheckpoints.emplace (seqNo, *checkpoint);
176220 }
177221 }
178222
@@ -325,10 +369,9 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
325369 }
326370
327371 void WriteNextMessage (NYdb::NTopic::TContinuationToken&& token) {
328- SINK_LOG_T (" Write data: \" " << Buffer.front () << " \" with seq no " << NextSeqNo);
329372 WriteSession->Write (std::move (token), Buffer.front (), NextSeqNo++);
330373 auto itemSize = GetItemSize (Buffer.front ());
331- WaitingAcks.push (itemSize);
374+ WaitingAcks.emplace (itemSize, TInstant::Now () );
332375 EgressStats.Bytes += itemSize;
333376 Buffer.pop ();
334377 }
@@ -348,6 +391,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
348391
349392 std::optional<TIssues> operator ()(NYdb::NTopic::TWriteSessionEvent::TAcksEvent& ev) {
350393 if (ev.Acks .empty ()) {
394+ LOG_D (Self.LogPrefix << " Empty ack" );
351395 return std::nullopt ;
352396 }
353397
@@ -362,14 +406,23 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
362406 return issues;
363407 }
364408
365- Self.FreeSpace += Self.WaitingAcks .front ();
409+ if (it->State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_ALREADY_WRITTEN) {
410+ Self.Metrics .AlreadyWritten ->Inc ();
411+ }
412+
413+ const auto & ackInfo = Self.WaitingAcks .front ();
414+ Self.Metrics .LastAckLatency ->Set ((TInstant::Now () - ackInfo.StartTime ).MilliSeconds ());
415+ Self.Metrics .InFlyData ->Dec ();
416+ Self.FreeSpace += ackInfo.MessageSize ;
366417 Self.WaitingAcks .pop ();
367418
368419 if (!Self.DeferredCheckpoints .empty () && std::get<0 >(Self.DeferredCheckpoints .front ()) == it->SeqNo ) {
369420 Self.ConfirmedSeqNo = it->SeqNo ;
370421 const auto & checkpoint = std::get<1 >(Self.DeferredCheckpoints .front ());
422+ LOG_D (Self.LogPrefix << " Send a deferred checkpoint, seqNo: " << it->SeqNo );
371423 Self.Callbacks ->OnAsyncOutputStateSaved (Self.BuildState (checkpoint), Self.OutputIndex , checkpoint);
372424 Self.DeferredCheckpoints .pop ();
425+ Self.Metrics .InFlyCheckpoints ->Dec ();
373426 }
374427 }
375428 Self.ConfirmedSeqNo = ev.Acks .back ().SeqNo ;
@@ -402,6 +455,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
402455 const ui64 OutputIndex;
403456 TDqAsyncStats EgressStats;
404457 const TTxId TxId;
458+ TMetrics Metrics;
405459 const NPq::NProto::TDqPqTopicSink SinkParams;
406460 NYdb::TDriver Driver;
407461 std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
@@ -419,7 +473,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
419473 NThreading::TFuture<void > EventFuture;
420474 bool ShouldNotifyNewFreeSpace = false ;
421475 std::queue<TString> Buffer;
422- std::queue<i64 > WaitingAcks; // Size of items which are waiting for acks (used to update free space)
476+ std::queue<TAckInfo > WaitingAcks; // Size of items which are waiting for acks (used to update free space)
423477 std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
424478};
425479
@@ -428,10 +482,12 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
428482 ui64 outputIndex,
429483 TCollectStatsLevel statsLevel,
430484 TTxId txId,
485+ ui64 taskId,
431486 const THashMap<TString, TString>& secureParams,
432487 NYdb::TDriver driver,
433488 ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
434489 IDqComputeActorAsyncOutput::ICallbacks* callbacks,
490+ const ::NMonitoring::TDynamicCounterPtr& counters,
435491 i64 freeSpace)
436492{
437493 const TString& tokenName = settings.GetToken ().GetName ();
@@ -442,17 +498,19 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
442498 outputIndex,
443499 statsLevel,
444500 txId,
501+ taskId,
445502 std::move (settings),
446503 std::move (driver),
447504 CreateCredentialsProviderFactoryForStructuredToken (credentialsFactory, token, addBearerToToken),
448505 callbacks,
506+ counters,
449507 freeSpace);
450508 return {actor, actor};
451509}
452510
453- void RegisterDqPqWriteActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
511+ void RegisterDqPqWriteActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters ) {
454512 factory.RegisterSink <NPq::NProto::TDqPqTopicSink>(" PqSink" ,
455- [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory)](
513+ [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory), counters ](
456514 NPq::NProto::TDqPqTopicSink&& settings,
457515 IDqAsyncIoFactory::TSinkArguments&& args)
458516 {
@@ -462,10 +520,12 @@ void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver dri
462520 args.OutputIndex ,
463521 args.StatsLevel ,
464522 args.TxId ,
523+ args.TaskId ,
465524 args.SecureParams ,
466525 driver,
467526 credentialsFactory,
468- args.Callback
527+ args.Callback ,
528+ counters
469529 );
470530 });
471531}
0 commit comments