Skip to content

Commit a25138b

Browse files
authored
Merge d89bf3a into 9c831b2
2 parents 9c831b2 + d89bf3a commit a25138b

File tree

3 files changed

+55
-25
lines changed

3 files changed

+55
-25
lines changed

ydb/core/fq/libs/actors/pending_fetcher.cpp

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,24 +358,52 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
358358
const TString folderId = NYdb::NFq::TScope(task.scope()).ParseFolder();
359359
const TString cloudId = task.sensor_labels().at("cloud_id");
360360
const TString queryId = task.query_id().value();
361+
const bool isStreaming = task.query_type() == FederatedQuery::QueryContent::STREAMING;
362+
TString queryIdLabel;
363+
TString queryNameLabel;
364+
if (task.automatic()) {
365+
queryIdLabel = isStreaming ? "streaming" : "analytics";
366+
} else {
367+
if (isStreaming) {
368+
// todo: sanitize query name
369+
queryNameLabel = task.query_name();
370+
queryIdLabel = queryId;
371+
} else {
372+
queryIdLabel = "manual";
373+
}
374+
}
361375

362376
::NYql::NCommon::TServiceCounters queryCounters(ServiceCounters);
363377
auto publicCountersParent = ServiceCounters.PublicCounters;
364378

365379
if (cloudId && folderId) {
366380
publicCountersParent = publicCountersParent->GetSubgroup("cloud_id", cloudId)->GetSubgroup("folder_id", folderId);
367381
}
368-
queryCounters.PublicCounters = publicCountersParent->GetSubgroup("query_id",
369-
task.automatic() ? (task.query_name() ? task.query_name() : "automatic") : queryId);
382+
383+
if (queryIdLabel) {
384+
publicCountersParent = publicCountersParent->GetSubgroup("query_id", queryIdLabel);
385+
}
386+
387+
if (queryNameLabel) {
388+
publicCountersParent = publicCountersParent->GetSubgroup("query_name", queryNameLabel);
389+
}
390+
queryCounters.PublicCounters = publicCountersParent;
370391

371392
auto rootCountersParent = ServiceCounters.RootCounters;
372393
std::set<std::pair<TString, TString>> sensorLabels(task.sensor_labels().begin(), task.sensor_labels().end());
373394
for (const auto& [label, item]: sensorLabels) {
374395
rootCountersParent = rootCountersParent->GetSubgroup(label, item);
375396
}
376397

377-
queryCounters.RootCounters = rootCountersParent->GetSubgroup("query_id",
378-
task.automatic() ? (folderId ? "automatic_" + folderId : "automatic") : queryId);
398+
if (queryIdLabel) {
399+
rootCountersParent = rootCountersParent->GetSubgroup("query_id", queryIdLabel);
400+
}
401+
402+
if (queryNameLabel) {
403+
rootCountersParent = rootCountersParent->GetSubgroup("query_name", queryNameLabel);
404+
}
405+
406+
queryCounters.RootCounters = rootCountersParent;
379407
queryCounters.Counters = queryCounters.RootCounters;
380408

381409
queryCounters.InitUptimeCounter();

ydb/tests/fq/s3/test_public_metrics.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,33 +56,34 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
5656
)
5757
logging.debug(str(metrics))
5858

59+
query_id_label = "manual"
5960
assert (
6061
metrics.find_sensor(
61-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}
62+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.running_tasks"}
6263
)
6364
>= 0
6465
)
6566
assert (
6667
metrics.find_sensor(
67-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}
68+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.cpu_usage_us"}
6869
)
6970
>= 0
7071
)
7172
assert (
7273
metrics.find_sensor(
73-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.memory_usage_bytes"}
74+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.memory_usage_bytes"}
7475
)
7576
> 0
7677
)
7778
assert (
7879
metrics.find_sensor(
79-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}
80+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.input_bytes"}
8081
)
8182
> 0
8283
)
8384
assert (
8485
metrics.find_sensor(
85-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}
86+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.uptime_seconds"}
8687
)
8788
>= 0
8889
)
@@ -91,7 +92,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
9192
{
9293
"cloud_id": cloud_id,
9394
"folder_id": folder_id,
94-
"query_id": query_id,
95+
"query_id": query_id_label,
9596
"name": "query.source_input_records",
9697
}
9798
)
@@ -103,15 +104,15 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
103104
{
104105
"cloud_id": cloud_id,
105106
"folder_id": folder_id,
106-
"query_id": query_id,
107+
"query_id": query_id_label,
107108
"name": "query.sink_output_records",
108109
}
109110
)
110111
is None
111112
)
112113
assert (
113114
metrics.find_sensor(
114-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
115+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
115116
)
116117
is None
117118
)
@@ -121,15 +122,15 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
121122
{
122123
"cloud_id": cloud_id,
123124
"folder_id": folder_id,
124-
"query_id": query_id,
125+
"query_id": query_id_label,
125126
"name": "query.sink_output_records",
126127
}
127128
)
128129
== 0
129130
)
130131
assert (
131132
metrics.find_sensor(
132-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
133+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
133134
)
134135
== 0
135136
)
@@ -156,37 +157,37 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
156157

157158
assert (
158159
metrics.find_sensor(
159-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}
160+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.running_tasks"}
160161
)
161162
>= 0
162163
)
163164
assert (
164165
metrics.find_sensor(
165-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}
166+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.cpu_usage_us"}
166167
)
167168
>= 0
168169
)
169170
assert (
170171
metrics.find_sensor(
171-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.memory_usage_bytes"}
172+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.memory_usage_bytes"}
172173
)
173174
> 0
174175
)
175176
assert (
176177
metrics.find_sensor(
177-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}
178+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.input_bytes"}
178179
)
179180
> 0
180181
)
181182
assert (
182183
metrics.find_sensor(
183-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}
184+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.uptime_seconds"}
184185
)
185186
>= 0
186187
)
187188
assert (
188189
metrics.find_sensor(
189-
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}
190+
{"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id_label, "name": "query.output_bytes"}
190191
)
191192
> 0
192193
)
@@ -195,7 +196,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
195196
{
196197
"cloud_id": cloud_id,
197198
"folder_id": folder_id,
198-
"query_id": query_id,
199+
"query_id": query_id_label,
199200
"name": "query.source_input_records",
200201
}
201202
)
@@ -206,7 +207,7 @@ def test_public_metrics(self, kikimr, s3, client, yq_version, unique_prefix):
206207
{
207208
"cloud_id": cloud_id,
208209
"folder_id": folder_id,
209-
"query_id": query_id,
210+
"query_id": query_id_label,
210211
"name": "query.sink_output_records",
211212
}
212213
)

ydb/tests/fq/yds/test_metrics_cleanup.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ class TestCleanup(TestYdsBase):
1515
@yq_v1
1616
def test_cleanup(self, kikimr, client):
1717
sql = "SELECT 1;"
18-
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
18+
query_name = "simple"
19+
query_id = client.create_query(query_name, sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
1920
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
2021

2122
assert kikimr.compute_plane.get_task_count(1, query_id) == 0
2223
deadline = time.time() + plain_or_under_sanitizer(120, 500)
2324
while True:
2425
value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor(
25-
{"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "Tasks"}
26+
{"query_id": query_name, "subsystem": "task_controller", "Stage": "Total", "sensor": "Tasks"}
2627
)
2728
if value is None:
2829
break
@@ -47,7 +48,7 @@ def test_keep(self, kikimr, client):
4748
deadline = time.time() + 90 # x1.5 of 60 sec
4849
while True:
4950
value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor(
50-
{"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "Tasks"}
51+
{"query_id": "simple", "subsystem": "task_controller", "Stage": "Total", "sensor": "Tasks"}
5152
)
5253
assert value is not None, "Tasks was cleaned"
5354
if time.time() > deadline:

0 commit comments

Comments
 (0)