Skip to content

Commit 6f6984f

Browse files
committed
Added async queries
1 parent e82ac97 commit 6f6984f

File tree

8 files changed

+172
-38
lines changed

8 files changed

+172
-38
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ struct TExecutionOptions {
2424
enum class EExecutionCase {
2525
GenericScript,
2626
GenericQuery,
27-
YqlScript
27+
YqlScript,
28+
AsyncQuery
2829
};
2930

3031
std::vector<TString> ScriptQueries;
@@ -40,7 +41,16 @@ struct TExecutionOptions {
4041
const TString TraceId = "kqprun_" + CreateGuidAsString();
4142

4243
bool HasResults() const {
43-
return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
44+
if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) {
45+
return false;
46+
}
47+
48+
for (EExecutionCase executionCase : ExecutionCases) {
49+
if (executionCase != EExecutionCase::AsyncQuery) {
50+
return true;
51+
}
52+
}
53+
return false;
4454
}
4555

4656
EExecutionCase GetExecutionCase(size_t index) const {
@@ -71,14 +81,16 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
7181
Sleep(executionOptions.LoopDelay);
7282
}
7383

74-
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script";
75-
if (numberQueries > 1) {
76-
Cout << " " << id;
77-
}
78-
if (numberLoops != 1) {
79-
Cout << ", loop " << queryId / numberQueries;
84+
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
85+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script";
86+
if (numberQueries > 1) {
87+
Cout << " " << id;
88+
}
89+
if (numberLoops != 1) {
90+
Cout << ", loop " << queryId / numberQueries;
91+
}
92+
Cout << "..." << colors.Default() << Endl;
8093
}
81-
Cout << "..." << colors.Default() << Endl;
8294

8395
switch (executionOptions.GetExecutionCase(id)) {
8496
case TExecutionOptions::EExecutionCase::GenericScript:
@@ -108,8 +120,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
108120
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
109121
}
110122
break;
123+
124+
case TExecutionOptions::EExecutionCase::AsyncQuery:
125+
runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId);
126+
break;
111127
}
112128
}
129+
runner.WaitAsyncQueries();
113130

114131
if (executionOptions.HasResults()) {
115132
try {
@@ -351,7 +368,8 @@ class TMain : public TMainClassArgs {
351368
TChoices<TExecutionOptions::EExecutionCase> executionCase({
352369
{"script", TExecutionOptions::EExecutionCase::GenericScript},
353370
{"query", TExecutionOptions::EExecutionCase::GenericQuery},
354-
{"yql-script", TExecutionOptions::EExecutionCase::YqlScript}
371+
{"yql-script", TExecutionOptions::EExecutionCase::YqlScript},
372+
{"async", TExecutionOptions::EExecutionCase::AsyncQuery}
355373
});
356374
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
357375
.RequiredArgument("query-type")
@@ -361,6 +379,10 @@ class TMain : public TMainClassArgs {
361379
TString choice(option->CurValOrDef());
362380
ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice));
363381
});
382+
options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
383+
.RequiredArgument("uint")
384+
.DefaultValue(RunnerOptions.InFlightLimit)
385+
.StoreResult(&RunnerOptions.InFlightLimit);
364386

365387
TChoices<NKikimrKqp::EQueryAction> scriptAction({
366388
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},

ydb/tests/tools/kqprun/src/actors.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@ namespace {
1111
class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
1212
public:
1313
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
14-
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
15-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
14+
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
1615
TProgressCallback progressCallback)
1716
: Request_(std::move(request))
1817
, Promise_(promise)
1918
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
2019
, ResultSizeLimit_(std::numeric_limits<i64>::max())
21-
, ResultSets_(resultSets)
2220
, ProgressCallback_(progressCallback)
2321
{
2422
if (resultRowsLimit) {
@@ -79,7 +77,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
7977
}
8078

8179
void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
82-
Promise_.SetValue(std::move(ev));
80+
Promise_.SetValue(TQueryResponse{.Response = std::move(ev), .ResultSets = std::move(ResultSets_)});
8381
PassAway();
8482
}
8583

@@ -91,12 +89,12 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
9189

9290
private:
9391
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
94-
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
92+
NThreading::TPromise<TQueryResponse> Promise_;
9593
ui64 ResultRowsLimit_;
9694
ui64 ResultSizeLimit_;
97-
std::vector<Ydb::ResultSet>& ResultSets_;
98-
std::vector<ui64> ResultSetSizes_;
9995
TProgressCallback ProgressCallback_;
96+
std::vector<Ydb::ResultSet> ResultSets_;
97+
std::vector<ui64> ResultSetSizes_;
10098
};
10199

102100
class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
@@ -186,10 +184,9 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
186184
} // anonymous namespace
187185

188186
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
189-
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
190-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
187+
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
191188
TProgressCallback progressCallback) {
192-
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
189+
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback);
193190
}
194191

195192
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {

ydb/tests/tools/kqprun/src/actors.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44

55
namespace NKqpRun {
66

7+
struct TQueryResponse {
8+
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response;
9+
std::vector<Ydb::ResultSet> ResultSets;
10+
};
11+
712
using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;
813

914
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
10-
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
11-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
15+
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
1216
TProgressCallback progressCallback);
1317

1418
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);

ydb/tests/tools/kqprun/src/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ struct TRunnerOptions {
5555
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;
5656
ETraceOptType TraceOptType = ETraceOptType::Disabled;
5757

58+
ui64 InFlightLimit = 0;
59+
5860
TYdbSetupSettings YdbSettings;
5961
};
6062

ydb/tests/tools/kqprun/src/kqp_runner.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include <library/cpp/colorizer/colors.h>
55
#include <library/cpp/json/json_reader.h>
66

7+
#include <util/system/condvar.h>
8+
79
#include <ydb/core/blob_depot/mon_main.h>
810
#include <ydb/core/fq/libs/compute/common/utils.h>
911

@@ -86,6 +88,34 @@ void PrintStatistics(const TString& fullStat, const THashMap<TString, i64>& flat
8688
//// TKqpRunner::TImpl
8789

8890
class TKqpRunner::TImpl {
91+
struct TAsyncState {
92+
ui64 OnStartRequest() {
93+
InFlight++;
94+
MaxInFlight = std::max(MaxInFlight, InFlight);
95+
return RequestId++;
96+
}
97+
98+
void OnRequestFinished(bool success) {
99+
InFlight--;
100+
if (success) {
101+
Completed++;
102+
} else {
103+
Failed++;
104+
}
105+
}
106+
107+
TString GetInfoString() const {
108+
return TStringBuilder() << "completed: " << Completed << ", failed: " << Failed << ", in flight: " << InFlight << ", max in flight: " << MaxInFlight << ", spend time: " << TInstant::Now() - Start;
109+
}
110+
111+
const TInstant Start = TInstant::Now();
112+
ui64 RequestId = 1;
113+
ui64 MaxInFlight = 0;
114+
ui64 InFlight = 0;
115+
ui64 Completed = 0;
116+
ui64 Failed = 0;
117+
};
118+
89119
public:
90120
enum class EQueryType {
91121
ScriptQuery,
@@ -163,6 +193,45 @@ class TKqpRunner::TImpl {
163193
return true;
164194
}
165195

196+
void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) {
197+
TGuard<TMutex> lock(Mutex_);
198+
199+
if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) {
200+
AwaitInFlight_.WaitI(Mutex_);
201+
}
202+
203+
ui64 requestId = AsyncState_.OnStartRequest();
204+
RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture<NKqpRun::TQueryResult>& f) {
205+
TGuard<TMutex> lock(Mutex_);
206+
207+
auto response = f.GetValue().Response;
208+
AsyncState_.OnRequestFinished(response.IsSuccess());
209+
if (response.IsSuccess()) {
210+
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n";
211+
} else {
212+
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default();
213+
}
214+
215+
if (AsyncState_.InFlight < Options_.InFlightLimit) {
216+
AwaitInFlight_.Signal();
217+
}
218+
if (!AsyncState_.InFlight) {
219+
AwaitFinish_.Signal();
220+
}
221+
RunningQueries_.erase(requestId);
222+
}).IgnoreResult();
223+
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n";
224+
}
225+
226+
void WaitAsyncQueries() {
227+
TGuard<TMutex> lock(Mutex_);
228+
229+
if (AsyncState_.InFlight) {
230+
Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for async queries..." << CoutColors_.Default() << Endl;
231+
AwaitFinish_.WaitI(Mutex_);
232+
}
233+
}
234+
166235
bool FetchScriptResults() {
167236
TYdbSetup::StopTraceOpt();
168237

@@ -373,6 +442,12 @@ class TKqpRunner::TImpl {
373442
TString ExecutionOperation_;
374443
TExecutionMeta ExecutionMeta_;
375444
std::vector<Ydb::ResultSet> ResultSets_;
445+
446+
TMutex Mutex_;
447+
TCondVar AwaitInFlight_;
448+
TCondVar AwaitFinish_;
449+
TAsyncState AsyncState_;
450+
std::unordered_map<ui64, NThreading::TFuture<void>> RunningQueries_;
376451
};
377452

378453

@@ -394,10 +469,18 @@ bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction act
394469
return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery);
395470
}
396471

472+
void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
473+
Impl_->ExecuteQueryAsync(query, action, traceId);
474+
}
475+
397476
bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
398477
return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery);
399478
}
400479

480+
void TKqpRunner::WaitAsyncQueries() {
481+
Impl_->WaitAsyncQueries();
482+
}
483+
401484
bool TKqpRunner::FetchScriptResults() {
402485
return Impl_->FetchScriptResults();
403486
}

ydb/tests/tools/kqprun/src/kqp_runner.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@ class TKqpRunner {
1616

1717
bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;
1818

19+
void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;
20+
1921
bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;
2022

23+
void WaitAsyncQueries();
24+
2125
bool FetchScriptResults();
2226

2327
bool ForgetExecutionOperation();

ydb/tests/tools/kqprun/src/ydb_setup.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred
6060
TString YqlToken_;
6161
};
6262

63+
TRequestResult GetQueryResult(TQueryResponse response, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets) {
64+
resultSets = std::move(response.ResultSets);
65+
66+
auto queryOperationResponse = response.Response->Get()->Record.GetRef();
67+
const auto& responseRecord = queryOperationResponse.GetResponse();
68+
69+
meta.Ast = responseRecord.GetQueryAst();
70+
if (const auto& plan = responseRecord.GetQueryPlan()) {
71+
meta.Plan = plan;
72+
}
73+
74+
return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());
75+
}
76+
6377
} // anonymous namespace
6478

6579

@@ -218,20 +232,20 @@ class TYdbSetup::TImpl {
218232
return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvScriptRequest, NKikimr::NKqp::TEvKqp::TEvScriptResponse>(std::move(event));
219233
}
220234

221-
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector<Ydb::ResultSet>& resultSets, TProgressCallback progressCallback) const {
235+
NThreading::TFuture<TQueryResponse> QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const {
222236
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest>();
223237
FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record);
224238

225239
if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) {
226240
event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs));
227241
}
228242

229-
auto promise = NThreading::NewPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr>();
243+
auto promise = NThreading::NewPromise<TQueryResponse>();
230244
auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit();
231245
auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit();
232-
GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback));
246+
GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, progressCallback), RandomNumber(Settings_.NodeCount));
233247

234-
return promise.GetFuture().GetValueSync();
248+
return promise.GetFuture();
235249
}
236250

237251
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
@@ -256,7 +270,7 @@ class TYdbSetup::TImpl {
256270
auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit();
257271
NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max());
258272

259-
GetRuntime()->Register(fetchActor);
273+
GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount));
260274

261275
return GetRuntime()->GrabEdgeEvent<NKikimr::NKqp::TEvFetchScriptResultsResponse>(edgeActor);
262276
}
@@ -288,7 +302,7 @@ class TYdbSetup::TImpl {
288302
template <typename TRequest, typename TResponse>
289303
typename TResponse::TPtr RunKqpProxyRequest(THolder<TRequest> event) const {
290304
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor();
291-
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId());
305+
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)));
292306

293307
GetRuntime()->Send(kqpProxy, edgeActor, event.Release());
294308

@@ -378,17 +392,16 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer
378392
}
379393

380394
TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets, TProgressCallback progressCallback) const {
381-
resultSets.clear();
382-
383-
auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef();
384-
const auto& responseRecord = queryOperationResponse.GetResponse();
385-
386-
meta.Ast = responseRecord.GetQueryAst();
387-
if (const auto& plan = responseRecord.GetQueryPlan()) {
388-
meta.Plan = plan;
389-
}
395+
auto response = Impl_->QueryRequestAsync(query, action, traceId, progressCallback).GetValueSync();
396+
return GetQueryResult(std::move(response), meta, resultSets);
397+
}
390398

391-
return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());
399+
NThreading::TFuture<TQueryResult> TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const {
400+
return Impl_->QueryRequestAsync(query, action, traceId, progressCallback).Apply([](const NThreading::TFuture<TQueryResponse>& f) {
401+
TQueryResult result;
402+
result.Response = GetQueryResult(f.GetValue(), result.Meta, result.ResultSets);
403+
return result;
404+
});
392405
}
393406

394407
TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets) const {

0 commit comments

Comments
 (0)