Skip to content

Commit 8aeea7e

Browse files
authored
Merge d07f9a2 into 02fdbe3
2 parents 02fdbe3 + d07f9a2 commit 8aeea7e

File tree

3 files changed

+192
-96
lines changed

3 files changed

+192
-96
lines changed

ydb/core/fq/libs/actors/pending_fetcher.cpp

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,25 +358,51 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
358358
const TString folderId = NYdb::NFq::TScope(task.scope()).ParseFolder();
359359
const TString cloudId = task.sensor_labels().at("cloud_id");
360360
const TString queryId = task.query_id().value();
361+
const bool isStreaming = task.query_type() == FederatedQuery::QueryContent::STREAMING;
362+
TString queryIdLabel;
363+
// todo: sanitize query name
364+
TString queryNameLabel = task.query_name();
365+
if (task.automatic()) {
366+
queryIdLabel = isStreaming ? "streaming" : "analytics";
367+
} else if (isStreaming) {
368+
queryIdLabel = queryId;
369+
} else {
370+
queryIdLabel = "manual";
371+
}
361372

362373
::NYql::NCommon::TServiceCounters queryCounters(ServiceCounters);
363374
auto publicCountersParent = ServiceCounters.PublicCounters;
364375

365376
if (cloudId && folderId) {
366377
publicCountersParent = publicCountersParent->GetSubgroup("cloud_id", cloudId)->GetSubgroup("folder_id", folderId);
367378
}
368-
queryCounters.PublicCounters = publicCountersParent->GetSubgroup("query_id",
369-
task.automatic() ? (task.query_name() ? task.query_name() : "automatic") : queryId);
379+
380+
::NMonitoring::TDynamicCounterPtr queryPublicCounters = publicCountersParent;
381+
// use original query id here
382+
queryPublicCounters = queryPublicCounters->GetSubgroup("query_id", queryId);
383+
384+
if (queryNameLabel) {
385+
queryPublicCounters = queryPublicCounters->GetSubgroup("query_name", queryNameLabel);
386+
}
387+
queryCounters.PublicCounters = queryPublicCounters;
370388

371389
auto rootCountersParent = ServiceCounters.RootCounters;
372390
std::set<std::pair<TString, TString>> sensorLabels(task.sensor_labels().begin(), task.sensor_labels().end());
373391
for (const auto& [label, item]: sensorLabels) {
374392
rootCountersParent = rootCountersParent->GetSubgroup(label, item);
375393
}
376394

377-
queryCounters.RootCounters = rootCountersParent->GetSubgroup("query_id",
378-
task.automatic() ? (folderId ? "automatic_" + folderId : "automatic") : queryId);
379-
queryCounters.Counters = queryCounters.RootCounters;
395+
::NMonitoring::TDynamicCounterPtr queryRootCounters = rootCountersParent;
396+
if (queryIdLabel) {
397+
queryRootCounters = queryRootCounters->GetSubgroup("query_id", queryIdLabel);
398+
}
399+
400+
if (!task.automatic() && isStreaming && queryNameLabel) {
401+
queryRootCounters = queryRootCounters->GetSubgroup("query_name", queryNameLabel);
402+
}
403+
404+
queryCounters.RootCounters = queryRootCounters;
405+
queryCounters.Counters = queryRootCounters;
380406

381407
queryCounters.InitUptimeCounter();
382408
const auto createdAt = TInstant::Now();

ydb/tests/fq/s3/test_public_metrics.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,33 +56,34 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
5656
)
5757
logging.debug(str(metrics))
5858

59+
query_id_label = "manual"
5960
assert (
6061
metrics.find_sensor(
61-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}
62+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.running_tasks"}
6263
)
6364
>= 0
6465
)
6566
assert (
6667
metrics.find_sensor(
67-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}
68+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.cpu_usage_us"}
6869
)
6970
>= 0
7071
)
7172
assert (
7273
metrics.find_sensor(
73-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.memory_usage_bytes"}
74+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.memory_usage_bytes"}
7475
)
7576
> 0
7677
)
7778
assert (
7879
metrics.find_sensor(
79-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}
80+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.input_bytes"}
8081
)
8182
> 0
8283
)
8384
assert (
8485
metrics.find_sensor(
85-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}
86+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.uptime_seconds"}
8687
)
8788
>= 0
8889
)
@@ -91,7 +92,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
9192
{
9293
"cloud_id": cloud_id,
9394
"folder_id": folder_id,
94-
"query_id": query_id,
95+
"query_id": query_id_label,
9596
"name": "query.source_input_records",
9697
}
9798
)
@@ -103,15 +104,15 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
103104
{
104105
"cloud_id": cloud_id,
105106
"folder_id": folder_id,
106-
"query_id": query_id,
107+
"query_id": query_id_label,
107108
"name": "query.sink_output_records",
108109
}
109110
)
110111
is None
111112
)
112113
assert (
113114
metrics.find_sensor(
114-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
115+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
115116
)
116117
is None
117118
)
@@ -121,15 +122,15 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
121122
{
122123
"cloud_id": cloud_id,
123124
"folder_id": folder_id,
124-
"query_id": query_id,
125+
"query_id": query_id_label,
125126
"name": "query.sink_output_records",
126127
}
127128
)
128129
== 0
129130
)
130131
assert (
131132
metrics.find_sensor(
132-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
133+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
133134
)
134135
== 0
135136
)
@@ -156,37 +157,37 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
156157

157158
assert (
158159
metrics.find_sensor(
159-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}
160+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.running_tasks"}
160161
)
161162
>= 0
162163
)
163164
assert (
164165
metrics.find_sensor(
165-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}
166+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.cpu_usage_us"}
166167
)
167168
>= 0
168169
)
169170
assert (
170171
metrics.find_sensor(
171-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.memory_usage_bytes"}
172+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.memory_usage_bytes"}
172173
)
173174
> 0
174175
)
175176
assert (
176177
metrics.find_sensor(
177-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}
178+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.input_bytes"}
178179
)
179180
> 0
180181
)
181182
assert (
182183
metrics.find_sensor(
183-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}
184+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.uptime_seconds"}
184185
)
185186
>= 0
186187
)
187188
assert (
188189
metrics.find_sensor(
189-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
190+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
190191
)
191192
> 0
192193
)
@@ -195,7 +196,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
195196
{
196197
"cloud_id": cloud_id,
197198
"folder_id": folder_id,
198-
"query_id": query_id,
199+
"query_id": query_id_label,
199200
"name": "query.source_input_records",
200201
}
201202
)
@@ -206,7 +207,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
206207
{
207208
"cloud_id": cloud_id,
208209
"folder_id": folder_id,
209-
"query_id": query_id,
210+
"query_id": query_id_label,
210211
"name": "query.sink_output_records",
211212
}
212213
)

0 commit comments

Comments
 (0)