Skip to content

Commit 73e9639

Browse files
committed
Added AsyncQueryRunnerActor
1 parent ce10854 commit 73e9639

File tree

8 files changed

+263
-154
lines changed

8 files changed

+263
-154
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
8181
Sleep(executionOptions.LoopDelay);
8282
}
8383

84-
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
84+
const auto executionCase = executionOptions.GetExecutionCase(id);
85+
if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) {
8586
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script";
8687
if (numberQueries > 1) {
8788
Cout << " " << id;
@@ -92,7 +93,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
9293
Cout << "..." << colors.Default() << Endl;
9394
}
9495

95-
switch (executionOptions.GetExecutionCase(id)) {
96+
switch (executionCase) {
9697
case TExecutionOptions::EExecutionCase::GenericScript:
9798
if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
9899
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
@@ -381,8 +382,8 @@ class TMain : public TMainClassArgs {
381382
});
382383
options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
383384
.RequiredArgument("uint")
384-
.DefaultValue(RunnerOptions.InFlightLimit)
385-
.StoreResult(&RunnerOptions.InFlightLimit);
385+
.DefaultValue(RunnerOptions.YdbSettings.InFlightLimit)
386+
.StoreResult(&RunnerOptions.YdbSettings.InFlightLimit);
386387

387388
TChoices<NKikimrKqp::EQueryAction> scriptAction({
388389
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},

ydb/tests/tools/kqprun/src/actors.cpp

Lines changed: 111 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "actors.h"
22

3+
#include <library/cpp/colorizer/colors.h>
4+
35
#include <ydb/core/kqp/common/simple/services.h>
46
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
57

@@ -10,26 +12,25 @@ namespace {
1012

1113
class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
1214
public:
13-
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
14-
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
15-
TProgressCallback progressCallback)
16-
: Request_(std::move(request))
15+
TRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback)
16+
: TargetNode(request.TargetNode)
17+
, Request_(std::move(request.Event))
1718
, Promise_(promise)
1819
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
1920
, ResultSizeLimit_(std::numeric_limits<i64>::max())
2021
, ProgressCallback_(progressCallback)
2122
{
22-
if (resultRowsLimit) {
23-
ResultRowsLimit_ = resultRowsLimit;
23+
if (request.ResultRowsLimit) {
24+
ResultRowsLimit_ = request.ResultRowsLimit;
2425
}
25-
if (resultSizeLimit) {
26-
ResultSizeLimit_ = resultSizeLimit;
26+
if (request.ResultSizeLimit) {
27+
ResultSizeLimit_ = request.ResultSizeLimit;
2728
}
2829
}
2930

3031
void Bootstrap() {
3132
NActors::ActorIdToProto(SelfId(), Request_->Record.MutableRequestActorId());
32-
Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), std::move(Request_));
33+
Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode), std::move(Request_));
3334

3435
Become(&TRunScriptActorMock::StateFunc);
3536
}
@@ -88,7 +89,8 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
8889
}
8990

9091
private:
91-
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
92+
ui32 TargetNode = 0;
93+
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
9294
NThreading::TPromise<TQueryResponse> Promise_;
9395
ui64 ResultRowsLimit_;
9496
ui64 ResultSizeLimit_;
@@ -97,25 +99,104 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
9799
std::vector<ui64> ResultSetSizes_;
98100
};
99101

100-
class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
101-
struct TEvPrivate {
102-
enum EEv : ui32 {
103-
EvResourcesInfo = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
102+
class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
103+
using TBase = NActors::TActor<TAsyncQueryRunnerActor>;
104+
105+
public:
106+
TAsyncQueryRunnerActor(ui64 inFlightLimit)
107+
: TBase(&TAsyncQueryRunnerActor::StateFunc)
108+
, InFlightLimit_(inFlightLimit)
109+
{}
110+
111+
STRICT_STFUNC(StateFunc,
112+
hFunc(TEvPrivate::TEvStartAsyncQuery, Handle);
113+
hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle);
114+
hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle);
115+
)
116+
117+
void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) {
118+
DelayedRequests_.emplace(std::move(ev));
119+
StartDelayedRequests();
120+
}
121+
122+
void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
123+
const ui64 requestId = ev->Get()->RequestId;
124+
RunningRequests_.erase(requestId);
125+
126+
const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef();
127+
const auto status = response.GetYdbStatus();
128+
129+
if (status == Ydb::StatusIds::SUCCESS) {
130+
Completed_++;
131+
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
132+
} else {
133+
Failed_++;
134+
NYql::TIssues issues;
135+
NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues);
136+
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default();
137+
}
138+
139+
StartDelayedRequests();
140+
TryFinalize();
141+
}
142+
143+
void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) {
144+
FinalizePromise_ = ev->Get()->FinalizePromise;
145+
if (!TryFinalize()) {
146+
Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl;
147+
}
148+
}
104149

105-
EvEnd
106-
};
150+
private:
151+
void StartDelayedRequests() {
152+
while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) {
153+
auto request = std::move(DelayedRequests_.front());
154+
DelayedRequests_.pop();
155+
156+
auto promise = NThreading::NewPromise<TQueryResponse>();
157+
Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr));
158+
RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
159+
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
160+
});
161+
162+
MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
163+
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
164+
165+
RequestId_++;
166+
request->Get()->StartPromise.SetValue();
167+
}
168+
}
107169

108-
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
170+
bool TryFinalize() {
171+
if (!FinalizePromise_ || !RunningRequests_.empty()) {
172+
return false;
173+
}
109174

110-
struct TEvResourcesInfo : public NActors::TEventLocal<TEvResourcesInfo, EvResourcesInfo> {
111-
explicit TEvResourcesInfo(i32 nodeCount)
112-
: NodeCount(nodeCount)
113-
{}
175+
FinalizePromise_->SetValue();
176+
PassAway();
177+
return true;
178+
}
114179

115-
const i32 NodeCount;
116-
};
117-
};
180+
TString GetInfoString() const {
181+
return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
182+
}
183+
184+
private:
185+
const ui64 InFlightLimit_;
186+
const TInstant StartTime_ = TInstant::Now();
187+
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
188+
189+
std::optional<NThreading::TPromise<void>> FinalizePromise_;
190+
std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
191+
std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;
192+
193+
ui64 RequestId_ = 1;
194+
ui64 MaxInFlight_ = 0;
195+
ui64 Completed_ = 0;
196+
ui64 Failed_ = 0;
197+
};
118198

199+
class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
119200
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);
120201

121202
public:
@@ -183,10 +264,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
183264

184265
} // anonymous namespace
185266

186-
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
187-
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
188-
TProgressCallback progressCallback) {
189-
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback);
267+
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) {
268+
return new TRunScriptActorMock(std::move(request), promise, progressCallback);
269+
}
270+
271+
NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) {
272+
return new TAsyncQueryRunnerActor(inFlightLimit);
190273
}
191274

192275
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {

ydb/tests/tools/kqprun/src/actors.h

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,68 @@ struct TQueryResponse {
99
std::vector<Ydb::ResultSet> ResultSets;
1010
};
1111

12+
struct TQueryRequest {
13+
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Event;
14+
ui32 TargetNode;
15+
ui64 ResultRowsLimit;
16+
ui64 ResultSizeLimit;
17+
};
18+
19+
struct TEvPrivate {
20+
enum EEv : ui32 {
21+
EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
22+
EvAsyncQueryFinished,
23+
EvFinalizeAsyncQueryRunner,
24+
25+
EvResourcesInfo,
26+
27+
EvEnd
28+
};
29+
30+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
31+
32+
struct TEvStartAsyncQuery : public NActors::TEventLocal<TEvStartAsyncQuery, EvStartAsyncQuery> {
33+
TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise<void> startPromise)
34+
: Request(std::move(request))
35+
, StartPromise(startPromise)
36+
{}
37+
38+
TQueryRequest Request;
39+
NThreading::TPromise<void> StartPromise;
40+
};
41+
42+
struct TEvAsyncQueryFinished : public NActors::TEventLocal<TEvAsyncQueryFinished, EvAsyncQueryFinished> {
43+
TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result)
44+
: RequestId(requestId)
45+
, Result(result)
46+
{}
47+
48+
const ui64 RequestId;
49+
const TQueryResponse Result;
50+
};
51+
52+
struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal<TEvFinalizeAsyncQueryRunner, EvFinalizeAsyncQueryRunner> {
53+
explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise<void> finalizePromise)
54+
: FinalizePromise(finalizePromise)
55+
{}
56+
57+
NThreading::TPromise<void> FinalizePromise;
58+
};
59+
60+
struct TEvResourcesInfo : public NActors::TEventLocal<TEvResourcesInfo, EvResourcesInfo> {
61+
explicit TEvResourcesInfo(i32 nodeCount)
62+
: NodeCount(nodeCount)
63+
{}
64+
65+
const i32 NodeCount;
66+
};
67+
};
68+
1269
using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;
1370

14-
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
15-
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
16-
TProgressCallback progressCallback);
71+
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);
72+
73+
NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit);
1774

1875
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
1976

ydb/tests/tools/kqprun/src/common.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ struct TYdbSetupSettings {
2828
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
2929
TIntrusivePtr<NYql::IYtGateway> YtGateway;
3030
NKikimrConfig::TAppConfig AppConfig;
31+
32+
ui64 InFlightLimit = 0;
3133
};
3234

3335

@@ -55,8 +57,6 @@ struct TRunnerOptions {
5557
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;
5658
ETraceOptType TraceOptType = ETraceOptType::Disabled;
5759

58-
ui64 InFlightLimit = 0;
59-
6060
TYdbSetupSettings YdbSettings;
6161
};
6262

0 commit comments

Comments
 (0)