Skip to content

Commit 756e3e4

Browse files
authored
Merge 42d8267 into 1091a58
2 parents 1091a58 + 42d8267 commit 756e3e4

File tree

19 files changed

+85
-25
lines changed

19 files changed

+85
-25
lines changed

ydb/core/grpc_services/rpc_kqp_base.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKqpProto::TK
6767
queryStats.set_process_cpu_time_us(kqpStats.GetWorkerCpuTimeUs());
6868
queryStats.set_total_cpu_time_us(totalCpuTimeUs);
6969
queryStats.set_total_duration_us(kqpStats.GetDurationUs());
70+
queryStats.set_queued_time_us(kqpStats.GetQueuedTimeUs());
7071
}
7172

7273
void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKikimrKqp::TQueryResponse& kqpResponse) {

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable {
114114
bool IsDocumentApiRestricted_ = false;
115115

116116
TInstant StartTime;
117+
TInstant ContinueTime;
117118
NYql::TKikimrQueryDeadlines QueryDeadlines;
118119
TKqpQueryStats QueryStats;
119120
bool KeepSession = false;

ydb/core/kqp/session_actor/kqp_query_stats.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
210210
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
211211
NKqpProto::TKqpStatsQuery result;
212212
result.SetDurationUs(DurationUs);
213+
result.SetQueuedTimeUs(QueuedTimeUs);
213214

214215
if (Compilation) {
215216
result.MutableCompilation()->SetFromCache(Compilation->FromCache);

ydb/core/kqp/session_actor/kqp_query_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NKqp {
88

99
struct TKqpQueryStats {
1010
ui64 DurationUs = 0;
11+
ui64 QueuedTimeUs = 0;
1112
std::optional<TKqpStatsCompile> Compilation;
1213

1314
ui64 WorkerCpuTimeUs = 0;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
475475

476476
void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
477477
YQL_ENSURE(QueryState);
478+
QueryState->ContinueTime = TInstant::Now();
478479

479480
if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
480481
LOG_T("Failed to place request in resource pool, feature flag is disabled");
@@ -1552,6 +1553,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15521553

15531554
stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
15541555
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
1556+
if (const auto continueTime = QueryState->ContinueTime) {
1557+
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
1558+
}
15551559
if (QueryState->CompileResult) {
15561560
stats->Compilation.emplace();
15571561
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,15 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
245245
auto db = kikimr.GetQueryClient();
246246

247247
TExecuteQuerySettings settings;
248+
settings.StatsMode(EStatsMode::Full);
248249

249250
{ // Existing pool
250251
settings.PoolId("default");
251252

252253
const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key";
253254
auto result = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
254255
CheckQueryResult(result);
256+
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStats()->GetQueuedTime(), TDuration::Zero());
255257
}
256258

257259
{ // Not existing pool (check workload manager enabled)

ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,16 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
109109
auto db = kikimr.GetQueryClient();
110110

111111
TExecuteScriptSettings settings;
112+
settings.StatsMode(EStatsMode::Full);
112113

113114
{ // Existing pool
114115
settings.PoolId("default");
115116

116117
auto scripOp = db.ExecuteScript("SELECT 42", settings).ExtractValueSync();
117118
UNIT_ASSERT_VALUES_EQUAL_C(scripOp.Status().GetStatus(), EStatus::SUCCESS, scripOp.Status().GetIssues().ToString());
118-
CheckScriptResults(scripOp, WaitScriptExecutionOperation(scripOp.Id(), kikimr.GetDriver()), db);
119+
auto readyOp = WaitScriptExecutionOperation(scripOp.Id(), kikimr.GetDriver());
120+
CheckScriptResults(scripOp, readyOp, db);
121+
UNIT_ASSERT_VALUES_UNEQUAL(readyOp.Metadata().ExecStats.GetQueuedTime(), TDuration::Zero());
119122
}
120123

121124
{ // Not existing pool (check workload manager enabled)

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
5454
UpdateConfigCounters(poolConfig);
5555
}
5656

57+
void CollectRequestLatency(TInstant continueTime) {
58+
if (continueTime) {
59+
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
60+
}
61+
}
62+
5763
void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
5864
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
5965
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
@@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
106112
const TActorId WorkerActorId;
107113
const TString SessionId;
108114
const TInstant StartTime = TInstant::Now();
115+
TInstant ContinueTime;
109116

110117
EState State = EState::Pending;
111118
bool Started = false; // after TEvContinueRequest success
@@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
267274
if (status == Ydb::StatusIds::SUCCESS) {
268275
LocalInFlight++;
269276
request->Started = true;
277+
request->ContinueTime = TInstant::Now();
270278
Counters.LocalInFly->Inc();
271279
Counters.ContinueOk->Inc();
272280
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
@@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
387395

388396
if (status == Ydb::StatusIds::SUCCESS) {
389397
Counters.CleanupOk->Inc();
390-
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
398+
Counters.CollectRequestLatency(request->ContinueTime);
391399
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
392400
} else {
393401
Counters.CleanupError->Inc();
@@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
401409
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());
402410

403411
Counters.Cancelled->Inc();
404-
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
412+
Counters.CollectRequestLatency(request->ContinueTime);
405413
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
406414
}
407415

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
487487
request->SetQuery(query);
488488
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
489489
request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
490+
request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL);
490491
request->SetDatabase(Settings_.DomainName_);
491492
request->SetPoolId(settings.PoolId_);
492493

@@ -567,6 +568,10 @@ const std::vector<Ydb::ResultSet>& TQueryRunnerResult::GetResultSets() const {
567568
return ResultSets;
568569
}
569570

571+
const NKqpProto::TKqpStatsQuery& TQueryRunnerResult::GetQueryStats() const {
572+
return Response.GetResponse().GetQueryStats();
573+
}
574+
570575
//// TQueryRunnerResultAsync
571576

572577
TQueryRunnerResult TQueryRunnerResultAsync::GetResult(TDuration timeout) const {

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct TQueryRunnerResult {
4444

4545
const Ydb::ResultSet& GetResultSet(size_t resultIndex) const;
4646
const std::vector<Ydb::ResultSet>& GetResultSets() const;
47+
const NKqpProto::TKqpStatsQuery& GetQueryStats() const;
4748
};
4849

4950
struct TQueryRunnerResultAsync {

0 commit comments

Comments
 (0)