11#include " actors.h"
22
3+ #include < library/cpp/colorizer/colors.h>
4+
35#include < ydb/core/kqp/common/simple/services.h>
6+ #include < ydb/core/kqp/rm_service/kqp_rm_service.h>
47
58
69namespace NKqpRun {
@@ -9,33 +12,33 @@ namespace {
912
1013class TRunScriptActorMock : public NActors ::TActorBootstrapped<TRunScriptActorMock> {
1114public:
12- TRunScriptActorMock (THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
13- NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
14- ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
15- : Request_(std::move(request))
15+ TRunScriptActorMock (TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback)
16+ : TargetNode_(request.TargetNode)
17+ , Request_(std::move(request.Event))
1618 , Promise_(promise)
1719 , ResultRowsLimit_(std::numeric_limits<ui64>::max())
1820 , ResultSizeLimit_(std::numeric_limits<i64 >::max())
19- , ResultSets_(resultSets )
21+ , ProgressCallback_(progressCallback )
2022 {
21- if (resultRowsLimit ) {
22- ResultRowsLimit_ = resultRowsLimit ;
23+ if (request. ResultRowsLimit ) {
24+ ResultRowsLimit_ = request. ResultRowsLimit ;
2325 }
24- if (resultSizeLimit ) {
25- ResultSizeLimit_ = resultSizeLimit ;
26+ if (request. ResultSizeLimit ) {
27+ ResultSizeLimit_ = request. ResultSizeLimit ;
2628 }
2729 }
2830
2931 void Bootstrap () {
3032 NActors::ActorIdToProto (SelfId (), Request_->Record .MutableRequestActorId ());
31- Send (NKikimr::NKqp::MakeKqpProxyID (SelfId (). NodeId () ), std::move (Request_));
33+ Send (NKikimr::NKqp::MakeKqpProxyID (TargetNode_ ), std::move (Request_));
3234
3335 Become (&TRunScriptActorMock::StateFunc);
3436 }
3537
3638 STRICT_STFUNC (StateFunc,
3739 hFunc (NKikimr::NKqp::TEvKqpExecuter::TEvStreamData, Handle);
3840 hFunc (NKikimr::NKqp::TEvKqp::TEvQueryResponse, Handle);
41+ hFunc (NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
3942 )
4043
4144 void Handle (NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
@@ -46,47 +49,233 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
4649 auto resultSetIndex = ev->Get ()->Record .GetQueryResultIndex ();
4750 if (resultSetIndex >= ResultSets_.size ()) {
4851 ResultSets_.resize (resultSetIndex + 1 );
52+ ResultSetSizes_.resize (resultSetIndex + 1 , 0 );
4953 }
5054
5155 if (!ResultSets_[resultSetIndex].truncated ()) {
56+ ui64& resultSetSize = ResultSetSizes_[resultSetIndex];
5257 for (auto & row : *ev->Get ()->Record .MutableResultSet ()->mutable_rows ()) {
5358 if (static_cast <ui64>(ResultSets_[resultSetIndex].rows_size ()) >= ResultRowsLimit_) {
5459 ResultSets_[resultSetIndex].set_truncated (true );
5560 break ;
5661 }
5762
58- if (ResultSets_[resultSetIndex].ByteSizeLong () + row.ByteSizeLong () > ResultSizeLimit_) {
63+ auto rowSize = row.ByteSizeLong ();
64+ if (resultSetSize + rowSize > ResultSizeLimit_) {
5965 ResultSets_[resultSetIndex].set_truncated (true );
6066 break ;
6167 }
6268
69+ resultSetSize += rowSize;
6370 *ResultSets_[resultSetIndex].add_rows () = std::move (row);
6471 }
65- *ResultSets_[resultSetIndex].mutable_columns () = ev->Get ()->Record .GetResultSet ().columns ();
72+ if (!ResultSets_[resultSetIndex].columns_size ()) {
73+ *ResultSets_[resultSetIndex].mutable_columns () = ev->Get ()->Record .GetResultSet ().columns ();
74+ }
6675 }
6776
6877 Send (ev->Sender , response.Release ());
6978 }
7079
7180 void Handle (NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
72- Promise_.SetValue (std::move (ev));
81+ Promise_.SetValue (TQueryResponse{. Response = std::move (ev), . ResultSets = std::move (ResultSets_)} );
7382 PassAway ();
7483 }
7584
85+ void Handle (NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
86+ if (ProgressCallback_) {
87+ ProgressCallback_ (ev->Get ()->Record );
88+ }
89+ }
90+
7691private:
77- THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
78- NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
92+ ui32 TargetNode_ = 0 ;
93+ std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
94+ NThreading::TPromise<TQueryResponse> Promise_;
7995 ui64 ResultRowsLimit_;
8096 ui64 ResultSizeLimit_;
81- std::vector<Ydb::ResultSet>& ResultSets_;
97+ TProgressCallback ProgressCallback_;
98+ std::vector<Ydb::ResultSet> ResultSets_;
99+ std::vector<ui64> ResultSetSizes_;
100+ };
101+
102+ class TAsyncQueryRunnerActor : public NActors ::TActor<TAsyncQueryRunnerActor> {
103+ using TBase = NActors::TActor<TAsyncQueryRunnerActor>;
104+
105+ public:
106+ TAsyncQueryRunnerActor (ui64 inFlightLimit)
107+ : TBase(&TAsyncQueryRunnerActor::StateFunc)
108+ , InFlightLimit_(inFlightLimit)
109+ {
110+ RunningRequests_.reserve (InFlightLimit_);
111+ }
112+
113+ STRICT_STFUNC (StateFunc,
114+ hFunc (TEvPrivate::TEvStartAsyncQuery, Handle);
115+ hFunc (TEvPrivate::TEvAsyncQueryFinished, Handle);
116+ hFunc (TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle);
117+ )
118+
119+ void Handle (TEvPrivate::TEvStartAsyncQuery::TPtr& ev) {
120+ DelayedRequests_.emplace (std::move (ev));
121+ StartDelayedRequests ();
122+ }
123+
124+ void Handle (TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
125+ const ui64 requestId = ev->Get ()->RequestId ;
126+ RunningRequests_.erase (requestId);
127+
128+ const auto & response = ev->Get ()->Result .Response ->Get ()->Record .GetRef ();
129+ const auto status = response.GetYdbStatus ();
130+
131+ if (status == Ydb::StatusIds::SUCCESS) {
132+ Completed_++;
133+ Cout << CoutColors_.Green () << TInstant::Now ().ToIsoStringLocal () << " Request #" << requestId << " completed. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << Endl;
134+ } else {
135+ Failed_++;
136+ NYql::TIssues issues;
137+ NYql::IssuesFromMessage (response.GetResponse ().GetQueryIssues (), issues);
138+ Cout << CoutColors_.Red () << TInstant::Now ().ToIsoStringLocal () << " Request #" << requestId << " failed " << status << " . " << CoutColors_.Yellow () << GetInfoString () << " \n " << CoutColors_.Red () << " Issues:\n " << issues.ToString () << CoutColors_.Default ();
139+ }
140+
141+ StartDelayedRequests ();
142+ TryFinalize ();
143+ }
144+
145+ void Handle (TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) {
146+ FinalizePromise_ = ev->Get ()->FinalizePromise ;
147+ if (!TryFinalize ()) {
148+ Cout << CoutColors_.Yellow () << TInstant::Now ().ToIsoStringLocal () << " Waiting for " << DelayedRequests_.size () + RunningRequests_.size () << " async queries..." << CoutColors_.Default () << Endl;
149+ }
150+ }
151+
152+ private:
153+ void StartDelayedRequests () {
154+ while (!DelayedRequests_.empty () && (!InFlightLimit_ || RunningRequests_.size () < InFlightLimit_)) {
155+ auto request = std::move (DelayedRequests_.front ());
156+ DelayedRequests_.pop ();
157+
158+ auto promise = NThreading::NewPromise<TQueryResponse>();
159+ Register (CreateRunScriptActorMock (std::move (request->Get ()->Request ), promise, nullptr ));
160+ RunningRequests_[RequestId_] = promise.GetFuture ().Subscribe ([id = RequestId_, this ](const NThreading::TFuture<TQueryResponse>& f) {
161+ Send (SelfId (), new TEvPrivate::TEvAsyncQueryFinished (id, std::move (f.GetValue ())));
162+ });
163+
164+ MaxInFlight_ = std::max (MaxInFlight_, RunningRequests_.size ());
165+ Cout << TStringBuilder () << CoutColors_.Cyan () << TInstant::Now ().ToIsoStringLocal () << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow () << GetInfoString () << CoutColors_.Default () << " \n " ;
166+
167+ RequestId_++;
168+ request->Get ()->StartPromise .SetValue ();
169+ }
170+ }
171+
172+ bool TryFinalize () {
173+ if (!FinalizePromise_ || !RunningRequests_.empty ()) {
174+ return false ;
175+ }
176+
177+ FinalizePromise_->SetValue ();
178+ PassAway ();
179+ return true ;
180+ }
181+
182+ TString GetInfoString () const {
183+ return TStringBuilder () << " completed: " << Completed_ << " , failed: " << Failed_ << " , in flight: " << RunningRequests_.size () << " , max in flight: " << MaxInFlight_ << " , spend time: " << TInstant::Now () - StartTime_;
184+ }
185+
186+ private:
187+ const ui64 InFlightLimit_;
188+ const TInstant StartTime_ = TInstant::Now();
189+ const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
190+
191+ std::optional<NThreading::TPromise<void >> FinalizePromise_;
192+ std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
193+ std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;
194+
195+ ui64 RequestId_ = 1 ;
196+ ui64 MaxInFlight_ = 0 ;
197+ ui64 Completed_ = 0 ;
198+ ui64 Failed_ = 0 ;
199+ };
200+
201+ class TResourcesWaiterActor : public NActors ::TActorBootstrapped<TResourcesWaiterActor> {
202+ static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10 );
203+
204+ public:
205+ TResourcesWaiterActor (NThreading::TPromise<void > promise, i32 expectedNodeCount)
206+ : ExpectedNodeCount_(expectedNodeCount)
207+ , Promise_(promise)
208+ {}
209+
210+ void Bootstrap () {
211+ Become (&TResourcesWaiterActor::StateFunc);
212+ CheckResourcesPublish ();
213+ }
214+
215+ void Handle (NActors::TEvents::TEvWakeup::TPtr&) {
216+ CheckResourcesPublish ();
217+ }
218+
219+ void Handle (TEvPrivate::TEvResourcesInfo::TPtr& ev) {
220+ if (ev->Get ()->NodeCount == ExpectedNodeCount_) {
221+ Promise_.SetValue ();
222+ PassAway ();
223+ return ;
224+ }
225+
226+ Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
227+ }
228+
229+ STRICT_STFUNC (StateFunc,
230+ hFunc (NActors::TEvents::TEvWakeup, Handle);
231+ hFunc (TEvPrivate::TEvResourcesInfo, Handle);
232+ )
233+
234+ private:
235+ void CheckResourcesPublish() {
236+ GetResourceManager ();
237+
238+ if (!ResourceManager_) {
239+ Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
240+ return ;
241+ }
242+
243+ UpdateResourcesInfo ();
244+ }
245+
246+ void GetResourceManager () {
247+ if (ResourceManager_) {
248+ return ;
249+ }
250+ ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager (SelfId ().NodeId ());
251+ }
252+
253+ void UpdateResourcesInfo () const {
254+ ResourceManager_->RequestClusterResourcesInfo (
255+ [selfId = SelfId (), actorContext = ActorContext ()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
256+ actorContext.Send (selfId, new TEvPrivate::TEvResourcesInfo (resources.size ()));
257+ });
258+ }
259+
260+ private:
261+ const i32 ExpectedNodeCount_;
262+ NThreading::TPromise<void > Promise_;
263+
264+ std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
82265};
83266
84267} // anonymous namespace
85268
86- NActors::IActor* CreateRunScriptActorMock (THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
87- NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
88- ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
89- return new TRunScriptActorMock (std::move (request), promise, resultRowsLimit, resultSizeLimit, resultSets);
269+ NActors::IActor* CreateRunScriptActorMock (TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) {
270+ return new TRunScriptActorMock (std::move (request), promise, progressCallback);
271+ }
272+
273+ NActors::IActor* CreateAsyncQueryRunnerActor (ui64 inFlightLimit) {
274+ return new TAsyncQueryRunnerActor (inFlightLimit);
275+ }
276+
277+ NActors::IActor* CreateResourcesWaiterActor (NThreading::TPromise<void > promise, i32 expectedNodeCount) {
278+ return new TResourcesWaiterActor (promise, expectedNodeCount);
90279}
91280
92281} // namespace NKqpRun
0 commit comments