@@ -102,12 +102,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
102102class TAsyncQueryRunnerActor : public NActors ::TActor<TAsyncQueryRunnerActor> {
103103 using TBase = NActors::TActor<TAsyncQueryRunnerActor>;
104104
105+ struct TRequestInfo {
106+ TInstant StartTime;
107+ NThreading::TFuture<TQueryResponse> RequestFuture;
108+ };
109+
105110public:
106- TAsyncQueryRunnerActor (ui64 inFlightLimit )
111+ TAsyncQueryRunnerActor (const TAsyncQueriesSettings& settings )
107112 : TBase(&TAsyncQueryRunnerActor::StateFunc)
108- , InFlightLimit_(inFlightLimit )
113+ , Settings_(settings )
109114 {
110- RunningRequests_.reserve (InFlightLimit_ );
115+ RunningRequests_.reserve (Settings_. InFlightLimit );
111116 }
112117
113118 STRICT_STFUNC (StateFunc,
@@ -123,21 +128,29 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
123128
124129 void Handle (TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
125130 const ui64 requestId = ev->Get ()->RequestId ;
131+ RequestsLatency_ += TInstant::Now () - RunningRequests_[requestId].StartTime ;
126132 RunningRequests_.erase (requestId);
127133
128134 const auto & response = ev->Get ()->Result .Response ->Get ()->Record .GetRef ();
129135 const auto status = response.GetYdbStatus ();
130136
131137 if (status == Ydb::StatusIds::SUCCESS) {
132138 Completed_++;
133- Cout << CoutColors_.Green () << TInstant::Now ().ToIsoStringLocal () << " Request #" << requestId << " completed. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << Endl;
139+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
140+ Cout << CoutColors_.Green () << TInstant::Now ().ToIsoStringLocal () << " Request #" << requestId << " completed. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << Endl;
141+ }
134142 } else {
135143 Failed_++;
136144 NYql::TIssues issues;
137145 NYql::IssuesFromMessage (response.GetResponse ().GetQueryIssues (), issues);
138146 Cout << CoutColors_.Red () << TInstant::Now ().ToIsoStringLocal () << " Request #" << requestId << " failed " << status << " . " << CoutColors_.Yellow () << GetInfoString () << " \n " << CoutColors_.Red () << " Issues:\n " << issues.ToString () << CoutColors_.Default ();
139147 }
140148
149+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now () - LastReportTime_ > TDuration::Seconds (1 )) {
150+ Cout << CoutColors_.Green () << TInstant::Now ().ToIsoStringLocal () << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << Endl;
151+ LastReportTime_ = TInstant::Now ();
152+ }
153+
141154 StartDelayedRequests ();
142155 TryFinalize ();
143156 }
@@ -151,18 +164,23 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
151164
152165private:
153166 void StartDelayedRequests () {
154- while (!DelayedRequests_.empty () && (!InFlightLimit_ || RunningRequests_.size () < InFlightLimit_ )) {
167+ while (!DelayedRequests_.empty () && (!Settings_. InFlightLimit || RunningRequests_.size () < Settings_. InFlightLimit )) {
155168 auto request = std::move (DelayedRequests_.front ());
156169 DelayedRequests_.pop ();
157170
158171 auto promise = NThreading::NewPromise<TQueryResponse>();
159172 Register (CreateRunScriptActorMock (std::move (request->Get ()->Request ), promise, nullptr ));
160- RunningRequests_[RequestId_] = promise.GetFuture ().Subscribe ([id = RequestId_, this ](const NThreading::TFuture<TQueryResponse>& f) {
161- Send (SelfId (), new TEvPrivate::TEvAsyncQueryFinished (id, std::move (f.GetValue ())));
162- });
173+ RunningRequests_[RequestId_] = {
174+ .StartTime = TInstant::Now (),
175+ .RequestFuture = promise.GetFuture ().Subscribe ([id = RequestId_, this ](const NThreading::TFuture<TQueryResponse>& f) {
176+ Send (SelfId (), new TEvPrivate::TEvAsyncQueryFinished (id, std::move (f.GetValue ())));
177+ })
178+ };
163179
164180 MaxInFlight_ = std::max (MaxInFlight_, RunningRequests_.size ());
165- Cout << TStringBuilder () << CoutColors_.Cyan () << TInstant::Now ().ToIsoStringLocal () << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << " \n " ;
181+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
182+ Cout << TStringBuilder () << CoutColors_.Cyan () << TInstant::Now ().ToIsoStringLocal () << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << " \n " ;
183+ }
166184
167185 RequestId_++;
168186 request->Get ()->StartPromise .SetValue ();
@@ -174,28 +192,38 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
174192 return false ;
175193 }
176194
195+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
196+ Cout << TStringBuilder () << CoutColors_.Cyan () << TInstant::Now ().ToIsoStringLocal () << " All async requests finished. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << " \n " ;
197+ }
198+
177199 FinalizePromise_->SetValue ();
178200 PassAway ();
179201 return true ;
180202 }
181203
182204 TString GetInfoString () const {
183- return TStringBuilder () << " completed: " << Completed_ << " , failed: " << Failed_ << " , in flight: " << RunningRequests_.size () << " , max in flight: " << MaxInFlight_ << " , spend time: " << TInstant::Now () - StartTime_;
205+ TStringBuilder result = TStringBuilder () << " completed: " << Completed_ << " , failed: " << Failed_ << " , in flight: " << RunningRequests_.size () << " , max in flight: " << MaxInFlight_ << " , spend time: " << TInstant::Now () - StartTime_;
206+ if (const auto amountRequests = Completed_ + Failed_) {
207+ result << " , average latency: " << RequestsLatency_ / amountRequests;
208+ }
209+ return result;
184210 }
185211
186212private:
187- const ui64 InFlightLimit_ ;
213+ const TAsyncQueriesSettings Settings_ ;
188214 const TInstant StartTime_ = TInstant::Now();
189215 const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
190216
191217 std::optional<NThreading::TPromise<void >> FinalizePromise_;
192218 std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
193- std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;
219+ std::unordered_map<ui64, TRequestInfo> RunningRequests_;
220+ TInstant LastReportTime_ = TInstant::Now();
194221
195222 ui64 RequestId_ = 1 ;
196223 ui64 MaxInFlight_ = 0 ;
197224 ui64 Completed_ = 0 ;
198225 ui64 Failed_ = 0 ;
226+ TDuration RequestsLatency_;
199227};
200228
201229class TResourcesWaiterActor : public NActors ::TActorBootstrapped<TResourcesWaiterActor> {
@@ -270,8 +298,8 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr
270298 return new TRunScriptActorMock (std::move (request), promise, progressCallback);
271299}
272300
273- NActors::IActor* CreateAsyncQueryRunnerActor (ui64 inFlightLimit ) {
274- return new TAsyncQueryRunnerActor (inFlightLimit );
301+ NActors::IActor* CreateAsyncQueryRunnerActor (const TAsyncQueriesSettings& settings ) {
302+ return new TAsyncQueryRunnerActor (settings );
275303}
276304
277305NActors::IActor* CreateResourcesWaiterActor (NThreading::TPromise<void > promise, i32 expectedNodeCount) {
0 commit comments