@@ -85,6 +85,30 @@ struct TEvPrivate {
8585} // namespace
8686
8787class TDqPqReadActor : public NActors ::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
88+ struct TMetrics {
89+ TMetrics (const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
90+ : TxId(std::visit([](auto arg) { return ToString (arg); }, txId))
91+ , Counters(counters) {
92+ SubGroup = Counters->GetSubgroup (" sink" , " PqRead" );
93+ auto sink = SubGroup->GetSubgroup (" tx_id" , TxId);
94+ auto task = sink->GetSubgroup (" task_id" , ToString (taskId));
95+ InFlyAsyncInputData = task->GetCounter (" InFlyAsyncInputData" );
96+ InFlySubscribe = task->GetCounter (" InFlySubscribe" );
97+ AsyncInputDataRate = task->GetCounter (" AsyncInputDataRate" , true );
98+ }
99+
100+ ~TMetrics () {
101+ SubGroup->RemoveSubgroup (" id" , TxId);
102+ }
103+
104+ TString TxId;
105+ ::NMonitoring::TDynamicCounterPtr Counters;
106+ ::NMonitoring::TDynamicCounterPtr SubGroup;
107+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
108+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
109+ ::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
110+ };
111+
88112public:
89113 using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
90114 using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
@@ -100,10 +124,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
100124 NYdb::TDriver driver,
101125 std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
102126 const NActors::TActorId& computeActorId,
127+ const ::NMonitoring::TDynamicCounterPtr& counters,
103128 i64 bufferSize)
104129 : TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
105130 , InputIndex(inputIndex)
106131 , TxId(txId)
132+ , Metrics(txId, taskId, counters)
107133 , BufferSize(bufferSize)
108134 , HolderFactory(holderFactory)
109135 , LogPrefix(TStringBuilder() << " SelfId: " << this ->SelfId () << ", TxId: " << TxId << ", task: " << taskId << ". PQ source. ")
@@ -245,9 +271,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
245271 hFunc (TEvPrivate::TEvSourceDataReady, Handle);
246272 )
247273
248- void Handle (TEvPrivate::TEvSourceDataReady::TPtr&) {
274+ void Handle (TEvPrivate::TEvSourceDataReady::TPtr& ev ) {
249275 SRC_LOG_T (" SessionId: " << GetSessionId () << " Source data ready" );
250276 SubscribedOnEvent = false ;
277+ if (ev.Get ()->Cookie ) {
278+ Metrics.InFlySubscribe ->Dec ();
279+ }
280+ Metrics.InFlyAsyncInputData ->Set (1 );
281+ Metrics.AsyncInputDataRate ->Inc ();
251282 Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
252283 }
253284
@@ -282,6 +313,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
282313 }
283314
284315 i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool &, i64 freeSpace) override {
316+ Metrics.InFlyAsyncInputData ->Set (0 );
285317 SRC_LOG_T (" SessionId: " << GetSessionId () << " GetAsyncInputData freeSpace = " << freeSpace);
286318
287319 const auto now = TInstant::Now ();
@@ -387,9 +419,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
387419 void SubscribeOnNextEvent () {
388420 if (!SubscribedOnEvent) {
389421 SubscribedOnEvent = true ;
422+ Metrics.InFlySubscribe ->Inc ();
390423 NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem ();
391424 EventFuture = GetReadSession ().WaitEvent ().Subscribe ([actorSystem, selfId = SelfId ()](const auto &){
392- actorSystem->Send (selfId, new TEvPrivate::TEvSourceDataReady ());
425+ actorSystem->Send (selfId, new TEvPrivate::TEvSourceDataReady (), 0 , 1 );
393426 });
394427 }
395428 }
@@ -595,6 +628,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
595628 const ui64 InputIndex;
596629 TDqAsyncStats IngressStats;
597630 const TTxId TxId;
631+ TMetrics Metrics;
598632 const i64 BufferSize;
599633 const THolderFactory& HolderFactory;
600634 const TString LogPrefix;
@@ -629,6 +663,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
629663 ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
630664 const NActors::TActorId& computeActorId,
631665 const NKikimr::NMiniKQL::THolderFactory& holderFactory,
666+ const ::NMonitoring::TDynamicCounterPtr& counters,
632667 i64 bufferSize
633668 )
634669{
@@ -653,15 +688,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
653688 std::move (driver),
654689 CreateCredentialsProviderFactoryForStructuredToken (credentialsFactory, token, addBearerToToken),
655690 computeActorId,
691+ counters,
656692 bufferSize
657693 );
658694
659695 return {actor, actor};
660696}
661697
662- void RegisterDqPqReadActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
698+ void RegisterDqPqReadActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters ) {
663699 factory.RegisterSource <NPq::NProto::TDqPqTopicSource>(" PqSource" ,
664- [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory)](
700+ [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory), counters ](
665701 NPq::NProto::TDqPqTopicSource&& settings,
666702 IDqAsyncIoFactory::TSourceArguments&& args)
667703 {
@@ -678,6 +714,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
678714 credentialsFactory,
679715 args.ComputeActorId ,
680716 args.HolderFactory ,
717+ counters,
681718 PQReadDefaultFreeSpace);
682719 });
683720
0 commit comments