-
Notifications
You must be signed in to change notification settings - Fork 91
/
probe_scraper.py
402 lines (360 loc) · 14.2 KB
/
probe_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.operators.branch import BaseBranchOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.weekday import WeekDay
from kubernetes.client import models as k8s
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """\
# Probe Scraper
*Triage notes*
As long as the most recent DAG run is successful this job can be considered healthy.
In such case, past DAG failures can be ignored.
## Debugging failures
probe_scraper and probe_scraper_moz_central task logs aren't available via the Airflow web console. In
order to access them, go to [GCP Logs Explorer](https://cloudlogging.app.goo.gl/sLyJuaPmVM6SnKtu7).
This link should get you directly to the last 12 hours of probe_scraper pod logs. If necessary, replace
`"probe-scraper.+"` with `"probe-scraper-moz-central.+"` in the query field.
If the above link fails, do the following:
1. Navigate to the [Google Cloud Logging console](https://console.cloud.google.com/logs/query?project=moz-fx-data-airflow-gke-prod)
If you can't access these logs but think you should be able to, [contact Data SRE](https://mana.mozilla.org/wiki/pages/viewpage.action?spaceKey=DOPS&title=Contacting+Data+SRE).
2. Search for the following, replacing `"probe-scraper.+"` with `"probe-scraper-moz-central.+"` if necessary (make sure to put this in the raw query field - you might need to click the "Show query" button for it to appear):
```
resource.type="k8s_container"
resource.labels.project_id="moz-fx-data-airflow-gke-prod"
resource.labels.location="us-west1"
resource.labels.cluster_name="workloads-prod-v1"
resource.labels.namespace_name="default"
resource.labels.pod_name=~"probe-scraper.+"
severity>=DEFAULT
```
Adjust the time window as needed and you should be able to see logs associated with the failure.
To find a name of the pod related to specific run, navigate to
[probe_scraper DAG in Airflow](https://workflow.telemetry.mozilla.org/tree?dag_id=probe_scraper),
click the task that failed, followed by `View Log`. Here, look for `probe-scraper.[ID]`.
"""
DEFAULT_LOOKML_GENERATOR_IMAGE_VERSION = "v1.17.0"
default_args = {
"owner": "akomar@mozilla.com",
"depends_on_past": False,
"start_date": datetime(2019, 10, 28),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
tags = [Tag.ImpactTier.tier_1]
aws_access_key_secret = Secret(
deploy_type="env",
deploy_target="AWS_ACCESS_KEY_ID",
secret="airflow-gke-secrets",
key="probe_scraper_secret__aws_access_key",
)
aws_secret_key_secret = Secret(
deploy_type="env",
deploy_target="AWS_SECRET_ACCESS_KEY",
secret="airflow-gke-secrets",
key="probe_scraper_secret__aws_secret_key",
)
mozilla_pipeline_schemas_secret_git_sshkey_b64 = Secret(
deploy_type="env",
deploy_target="MPS_SSH_KEY_BASE64",
secret="airflow-gke-secrets",
key="probe_scraper_secret__mozilla_pipeline_schemas_secret_git_sshkey_b64",
)
with DAG(
"probe_scraper",
doc_md=DOCS,
default_args=default_args,
params={"update": Param(True, type="boolean")},
schedule_interval="0 0 * * *",
tags=tags,
) as dag:
airflow_gke_prod_kwargs = {
"gcp_conn_id": "google_cloud_airflow_gke",
"project_id": "moz-fx-data-airflow-gke-prod",
"location": "us-west1",
"cluster_name": "workloads-prod-v1",
}
# Built from repo https://github.com/mozilla/probe-scraper
probe_scraper_image = "gcr.io/moz-fx-data-airflow-prod-88e0/probe-scraper:latest"
# probe scraper used to be a single task, but it has beeen split up, and individual
# failures do not block downstream tasks
probe_scraper = EmptyOperator(
task_id="probe_scraper",
trigger_rule="all_done",
dag=dag,
)
probe_scraper_base_arguments = [
"python3",
"-m",
"probe_scraper.runner",
"--out-dir=/app/probe_data",
"--cache-dir=/app/probe_cache",
"--output-bucket=gs://probe-scraper-prod-artifacts/",
"--env=prod",
]
probe_scraper_moz_central = GKEPodOperator(
task_id="probe_scraper_moz_central",
name="probe-scraper-moz-central",
# Needed for proper cluster autoscaling, because cluster autoscaling
# works on pod resource requests, instead of usage
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "4500Mi"},
),
# Due to the nature of the container run, we set get_logs to False, to avoid
# urllib3.exceptions.ProtocolError: 'Connection broken: IncompleteRead(0 bytes
# read)' errors where the pod continues to run, but airflow loses its connection
# and sets the status to Failed
get_logs=False,
# Give additional time since the cluster may scale up when running this job
startup_timeout_seconds=360,
image=probe_scraper_image,
arguments=(
[
*probe_scraper_base_arguments,
"--cache-bucket=gs://probe-scraper-prod-cache/",
"--moz-central",
]
),
email=[
"telemetry-alerts@mozilla.com",
"telemetry-client-dev@mozilla.com",
"aplacitelli@mozilla.com",
"dataops+alerts@mozilla.com",
"akomar@mozilla.com",
],
env_vars={"BOTO_PATH": ".gce_boto"},
dag=dag,
**airflow_gke_prod_kwargs,
)
probe_scraper_moz_central >> probe_scraper
probe_scraper_glean = [
GKEPodOperator(
task_id=f"probe_scraper_glean_{name.replace('-', '_')}",
name=f"probe-scraper-glean-{name}",
image=probe_scraper_image,
arguments=(
[
*probe_scraper_base_arguments,
"--glean",
f"--glean-url={url}",
# if dag param update has been manually set to False, use
# "--glean-limit-date=", "--no-update", otherwise default to
# "--glean-limit-date={{ds}}", "--update"
"--glean-limit-date={{ds if dag_run.conf['update'] else ''}}",
"--{{'' if dag_run.conf['update'] else 'no-'}}update",
]
+ (
[
"--bugzilla-api-key",
"{{ var.value.bugzilla_probe_expiry_bot_api_key }}",
]
if name == "gecko-dev"
else []
)
),
email=[
"telemetry-alerts@mozilla.com",
"telemetry-client-dev@mozilla.com",
"aplacitelli@mozilla.com",
"dataops+alerts@mozilla.com",
"akomar@mozilla.com",
],
env_vars={
"BOTO_PATH": ".gce_boto",
},
secrets=[aws_access_key_secret, aws_secret_key_secret],
dag=dag,
**airflow_gke_prod_kwargs,
)
for name, url in (
("gecko-dev", "https://github.com/mozilla/gecko-dev"),
("phabricator", "https://github.com/mozilla-conduit/review"),
("releases-comm-central", "https://github.com/mozilla/releases-comm-central"),
)
]
probe_scraper_glean >> probe_scraper
probe_scraper_glean_repositories = GKEPodOperator(
task_id="probe_scraper_glean_repositories",
name="probe-scraper-glean-repositories",
image=probe_scraper_image,
arguments=(
[
*probe_scraper_base_arguments,
# when --update is specified without --glean-repo or --glean-url,
# this only writes metadata changes.
"--update",
"--glean",
]
),
email=[
"telemetry-alerts@mozilla.com",
"telemetry-client-dev@mozilla.com",
"aplacitelli@mozilla.com",
"dataops+alerts@mozilla.com",
"akomar@mozilla.com",
],
env_vars={"BOTO_PATH": ".gce_boto"},
dag=dag,
**airflow_gke_prod_kwargs,
)
probe_scraper_glean_repositories >> probe_scraper_glean
probe_scraper_checks = [
GKEPodOperator(
task_id=f"probe_scraper_{check_name.replace('-', '_')}",
name=f"probe-scraper-{check_name}",
image=probe_scraper_image,
arguments=(
[
*probe_scraper_base_arguments,
f"--{check_name}",
"--bugzilla-api-key={{ var.value.bugzilla_probe_expiry_bot_api_key }}",
# don't write any generated files, this job is for emails only
"--env=dev",
# specify --update without --glean-repo or --glean-url to not scrape any
# repos, and download probe data from --output-bucket for expiry checks
"--update",
"--glean",
]
),
email=[
"telemetry-alerts@mozilla.com",
"telemetry-client-dev@mozilla.com",
"aplacitelli@mozilla.com",
"dataops+alerts@mozilla.com",
"akomar@mozilla.com",
],
env_vars={
"BOTO_PATH": ".gce_boto",
},
secrets=[aws_access_key_secret, aws_secret_key_secret],
dag=dag,
**airflow_gke_prod_kwargs,
)
for check_name in ("check-expiry", "check-fog-expiry")
]
dummy_branch = EmptyOperator(
task_id="dummy_branch",
dag=dag,
)
class CheckBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Return an array of task_ids to be executed.
These tasks must be downstream of the branch task.
"""
weekday = context["execution_date"].isoweekday()
if weekday == WeekDay.MONDAY:
return ["probe_scraper_check_expiry"]
elif weekday == WeekDay.WEDNESDAY:
return ["probe_scraper_check_fog_expiry"]
else:
return ["dummy_branch"]
check_branch = CheckBranchOperator(
task_id="probe_scraper_check_branch",
# wait for upstream, but ignore upstream failures
trigger_rule="all_done",
dag=dag,
)
check_branch >> [*probe_scraper_checks, dummy_branch]
probe_scraper >> check_branch
schema_generator = GKEPodOperator(
email=[
"akomar@mozilla.com",
"dataops+alerts@mozilla.com",
"telemetry-alerts@mozilla.com",
],
task_id="mozilla_schema_generator",
name="schema-generator-1",
image="mozilla/mozilla-schema-generator:latest",
env_vars={
"MPS_REPO_URL": "git@github.com:mozilla-services/mozilla-pipeline-schemas.git",
"MPS_BRANCH_SOURCE": "main",
"MPS_BRANCH_PUBLISH": "generated-schemas",
},
secrets=[mozilla_pipeline_schemas_secret_git_sshkey_b64],
dag=dag,
)
schema_generator.set_upstream(probe_scraper)
probe_expiry_alerts = GKEPodOperator(
task_id="probe-expiry-alerts",
name="probe-expiry-alerts",
image=probe_scraper_image,
arguments=[
"python3",
"-m",
"probe_scraper.probe_expiry_alert",
"--date",
"{{ ds }}",
"--bugzilla-api-key",
"{{ var.value.bugzilla_probe_expiry_bot_api_key }}",
],
email=["akomar@mozilla.com", "telemetry-alerts@mozilla.com"],
secrets=[aws_access_key_secret, aws_secret_key_secret],
dag=dag,
)
probe_expiry_alerts.set_upstream(probe_scraper)
wait_for_table_partition_expirations = ExternalTaskSensor(
task_id="wait_for_table_partition_expirations",
external_dag_id="bqetl_monitoring",
external_task_id="monitoring_derived__table_partition_expirations__v1",
execution_delta=timedelta(hours=-2),
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
ping_expiry_alerts = GKEPodOperator(
task_id="ping_expiry_alerts",
image=probe_scraper_image,
arguments=[
"python3",
"-m",
"probe_scraper.ping_expiry_alert",
"--run-date",
"{{ ds }}",
],
owner="bewu@mozilla.com",
email=["bewu@mozilla.com", "telemetry-alerts@mozilla.com"],
secrets=[aws_access_key_secret, aws_secret_key_secret],
dag=dag,
)
ping_expiry_alerts.set_upstream(wait_for_table_partition_expirations)
ping_expiry_alerts.set_upstream(probe_scraper)
delay_python_task = PythonOperator(
task_id="wait_for_1_hour", dag=dag, python_callable=lambda: time.sleep(60 * 60)
)
probe_scraper >> delay_python_task
# trigger lookml generation
trigger_looker = TriggerDagRunOperator(
task_id="trigger_looker", trigger_dag_id="looker", wait_for_completion=True
)
# This emits a POST request to a netlify webhook URL that triggers a new
# build of the glean dictionary. We do this after the schema generator has
# finished running as the dictionary uses the new schema files as part of
# said build.
glean_dictionary_netlify_build = SimpleHttpOperator(
http_conn_id="http_netlify_build_webhook",
endpoint=Variable.get("glean_dictionary_netlify_build_webhook_id"),
method="POST",
data={},
owner="jrediger@mozilla.com",
email=[
"jrediger@mozilla.com",
"dataops+alerts@mozilla.com",
"telemetry-alerts@mozilla.com",
],
task_id="glean_dictionary_build",
dag=dag,
)
delay_python_task >> trigger_looker >> glean_dictionary_netlify_build