@@ -41,13 +41,26 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
4141 public NActors::TActorBootstrapped<TDqChannelStorageActor>
4242{
4343 using TBase = TActorBootstrapped<TDqChannelStorageActor>;
44+
45+ struct TWritingBlobInfo {
46+ ui64 Size;
47+ NThreading::TPromise<void > SavePromise;
48+ TInstant OpBegin;
49+ };
50+
51+ struct TLoadingBlobInfo {
52+ NThreading::TPromise<TBuffer> BlobPromise;
53+ TInstant OpBegin;
54+ };
4455public:
4556
46- TDqChannelStorageActor (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
57+ TDqChannelStorageActor (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
58+ TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
4759 : TxId_(txId)
4860 , ChannelId_(channelId)
4961 , WakeUpCallback_(std::move(wakeUpCallback))
5062 , ErrorCallback_(std::move(errorCallback))
63+ , SpillingTaskCounters_(spillingTaskCounters)
5164 , ActorSystem_(actorSystem)
5265 {}
5366
@@ -101,8 +114,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
101114 void HandleWork (TEvDqChannelSpilling::TEvGet::TPtr& ev) {
102115 auto & msg = *ev->Get ();
103116 LOG_T (" [TEvGet] blobId: " << msg.BlobId_ );
117+
118+ auto opBegin = TInstant::Now ();
104119
105- LoadingBlobs_.emplace (msg.BlobId_ , std::move (msg.Promise_ ));
120+ auto loadingBlobInfo = TLoadingBlobInfo{std::move (msg.Promise_ ), opBegin};
121+ LoadingBlobs_.emplace (msg.BlobId_ , std::move (loadingBlobInfo));
106122
107123 SendInternal (SpillingActorId_, new TEvDqSpilling::TEvRead (msg.BlobId_ ));
108124 }
@@ -111,7 +127,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
111127 auto & msg = *ev->Get ();
112128 LOG_T (" [TEvPut] blobId: " << msg.BlobId_ );
113129
114- WritingBlobs_.emplace (msg.BlobId_ , std::move (msg.Promise_ ));
130+ auto opBegin = TInstant::Now ();
131+
132+ auto writingBlobInfo = TWritingBlobInfo{msg.Blob_ .size (), std::move (msg.Promise_ ), opBegin};
133+ WritingBlobs_.emplace (msg.BlobId_ , std::move (writingBlobInfo));
115134
116135 SendInternal (SpillingActorId_, new TEvDqSpilling::TEvWrite (msg.BlobId_ , std::move (msg.Blob_ )));
117136 }
@@ -126,8 +145,15 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
126145 return ;
127146 }
128147
148+ auto & blobInfo = it->second ;
149+
150+ if (SpillingTaskCounters_) {
151+ SpillingTaskCounters_->ChannelWriteBytes += blobInfo.Size ;
152+ auto opDuration = TInstant::Now () - blobInfo.OpBegin ;
153+ SpillingTaskCounters_->ChannelWriteTime += opDuration.MilliSeconds ();
154+ }
129155 // Complete the future
130- it-> second .SetValue ();
156+ blobInfo. SavePromise .SetValue ();
131157 WritingBlobs_.erase (it);
132158
133159 WakeUpCallback_ ();
@@ -143,7 +169,14 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
143169 return ;
144170 }
145171
146- it->second .SetValue (std::move (msg.Blob ));
172+ auto & blobInfo = it->second ;
173+
174+ if (SpillingTaskCounters_) {
175+ auto opDuration = TInstant::Now () - blobInfo.OpBegin ;
176+ SpillingTaskCounters_->ChannelReadTime += opDuration.MilliSeconds ();
177+ }
178+
179+ blobInfo.BlobPromise .SetValue (std::move (msg.Blob ));
147180 LoadingBlobs_.erase (it);
148181
149182 WakeUpCallback_ ();
@@ -163,15 +196,17 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
163196private:
164197 const TTxId TxId_;
165198 const ui64 ChannelId_;
199+
166200 TWakeUpCallback WakeUpCallback_;
167201 TErrorCallback ErrorCallback_;
202+ TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters_;
168203 TActorId SpillingActorId_;
169204
170- // BlobId -> promise that blob is saved
171- std::unordered_map<ui64, NThreading::TPromise< void > > WritingBlobs_;
205+ // BlobId -> blob size + promise that blob is saved
206+ std::unordered_map<ui64, TWritingBlobInfo > WritingBlobs_;
172207
173208 // BlobId -> promise with requested blob
174- std::unordered_map<ui64, NThreading::TPromise<TBuffer> > LoadingBlobs_;
209+ std::unordered_map<ui64, TLoadingBlobInfo > LoadingBlobs_;
175210
176211 TActorSystem* ActorSystem_;
177212};
@@ -181,9 +216,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
181216IDqChannelStorageActor* CreateDqChannelStorageActor (TTxId txId, ui64 channelId,
182217 TWakeUpCallback&& wakeUpCallback,
183218 TErrorCallback&& errorCallback,
219+ TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters,
184220 NActors::TActorSystem* actorSystem)
185221{
186- return new TDqChannelStorageActor (txId, channelId, std::move (wakeUpCallback), std::move (errorCallback), actorSystem);
222+ return new TDqChannelStorageActor (txId, channelId, std::move (wakeUpCallback), std::move (errorCallback), spillingTaskCounters, actorSystem);
187223}
188224
189225} // namespace NYql::NDq
0 commit comments