Skip to content

Commit d481174

Browse files
author
Tuan Vu
committed
Add bigquery_github_trends DAG
1 parent 78f572f commit d481174

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import json
2+
from datetime import timedelta, datetime
3+
4+
from airflow import DAG
5+
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
6+
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
7+
8+
default_args = {
9+
'owner': 'airflow',
10+
'depends_on_past': True,
11+
'start_date': datetime(2018, 12, 1),
12+
'end_date': datetime(2018, 12, 5),
13+
'email': ['airflow@airflow.com'],
14+
'email_on_failure': True,
15+
'email_on_retry': False,
16+
'retries': 2,
17+
'retry_delay': timedelta(minutes=5),
18+
}
19+
20+
# Set Schedule: Run pipeline once a day.
21+
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
22+
schedule_interval = "00 21 * * *"
23+
24+
# Define DAG: Set ID and assign default args and schedule interval
25+
dag = DAG(
26+
'bigquery_github_trends',
27+
default_args=default_args,
28+
schedule_interval=schedule_interval
29+
)
30+
31+
# Config variables
32+
BQ_CONN_ID = "my_gcp_conn"
33+
BQ_PROJECT = "my-bq-project"
34+
BQ_DATASET = "my-bq-dataset"
35+
36+
## Task 1: check that the github archive data has a dated table created for that date
37+
# To test this task, run this command:
38+
# docker-compose -f docker-compose-gcloud.yml run --rm webserver airflow test bigquery_github_trends bq_check_githubarchive_day 2018-12-01
39+
t1 = BigQueryCheckOperator(
40+
task_id='bq_check_githubarchive_day',
41+
sql='''
42+
#standardSQL
43+
SELECT
44+
table_id
45+
FROM
46+
`githubarchive.day.__TABLES_SUMMARY__`
47+
WHERE
48+
table_id = "{{ yesterday_ds_nodash }}"
49+
''',
50+
use_legacy_sql=False,
51+
bigquery_conn_id=BQ_CONN_ID,
52+
dag=dag
53+
)
54+
55+
## Task 2: check that the hacker news table contains data for that date.
56+
t2 = BigQueryCheckOperator(
57+
task_id='bq_check_hackernews_full',
58+
sql='''
59+
#standardSQL
60+
SELECT
61+
FORMAT_TIMESTAMP("%Y%m%d", timestamp ) AS date
62+
FROM
63+
`bigquery-public-data.hacker_news.full`
64+
WHERE
65+
type = 'story'
66+
AND FORMAT_TIMESTAMP("%Y%m%d", timestamp ) = "{{ yesterday_ds_nodash }}"
67+
LIMIT
68+
1
69+
''',
70+
use_legacy_sql=False,
71+
bigquery_conn_id=BQ_CONN_ID,
72+
dag=dag
73+
)
74+
75+
## Task 3: aggregate github events to a daily partition table
76+
t3 = BigQueryOperator(
77+
task_id='bq_write_to_github_daily_metrics',
78+
sql='''
79+
#standardSQL
80+
SELECT
81+
date,
82+
repo,
83+
SUM(IF(type='WatchEvent', 1, NULL)) AS stars,
84+
SUM(IF(type='ForkEvent', 1, NULL)) AS forks
85+
FROM (
86+
SELECT
87+
FORMAT_TIMESTAMP("%Y%m%d", created_at) AS date,
88+
actor.id as actor_id,
89+
repo.name as repo,
90+
type
91+
FROM
92+
`githubarchive.day.{{ yesterday_ds_nodash }}`
93+
WHERE type IN ('WatchEvent','ForkEvent')
94+
)
95+
GROUP BY
96+
date,
97+
repo
98+
''',
99+
destination_dataset_table='{0}.{1}.github_daily_metrics${2}'.format(
100+
BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
101+
),
102+
write_disposition='WRITE_TRUNCATE',
103+
allow_large_results=True,
104+
use_legacy_sql=False,
105+
bigquery_conn_id=BQ_CONN_ID,
106+
dag=dag
107+
)
108+
109+
## Task 4: aggregate github events to daily partition table
110+
t4 = BigQueryOperator(
111+
task_id='bq_write_to_github_agg',
112+
sql='''
113+
#standardSQL
114+
SELECT
115+
"{{ yesterday_ds_nodash }}" as date,
116+
repo,
117+
SUM(stars) as stars_last_28_days,
118+
SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}")
119+
AND TIMESTAMP("{{ yesterday_ds }}") ,
120+
stars, null)) as stars_last_7_days,
121+
SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}")
122+
AND TIMESTAMP("{{ yesterday_ds }}") ,
123+
stars, null)) as stars_last_1_day,
124+
SUM(forks) as forks_last_28_days,
125+
SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}")
126+
AND TIMESTAMP("{{ yesterday_ds }}") ,
127+
forks, null)) as forks_last_7_days,
128+
SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}")
129+
AND TIMESTAMP("{{ yesterday_ds }}") ,
130+
forks, null)) as forks_last_1_day
131+
FROM
132+
`viant-data-science.github_trends.github_daily_metrics`
133+
WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -27) }}")
134+
AND TIMESTAMP("{{ yesterday_ds }}")
135+
GROUP BY
136+
date,
137+
repo
138+
''',
139+
destination_dataset_table='{0}.{1}.github_agg${2}'.format(
140+
BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
141+
),
142+
write_disposition='WRITE_TRUNCATE',
143+
allow_large_results=True,
144+
use_legacy_sql=False,
145+
bigquery_conn_id=BQ_CONN_ID,
146+
dag=dag
147+
)
148+
149+
# Task 5: aggregate hacker news data to a daily partition table
150+
151+
t5 = BigQueryOperator(
152+
task_id='bq_write_to_hackernews_agg',
153+
sql='''
154+
#standardSQL
155+
SELECT
156+
FORMAT_TIMESTAMP("%Y%m%d", timestamp) AS date,
157+
`by` AS submitter,
158+
id as story_id,
159+
REGEXP_EXTRACT(url, "(https?://github.com/[^/]*/[^/#?]*)") as url,
160+
SUM(score) as score
161+
FROM
162+
`bigquery-public-data.hacker_news.full`
163+
WHERE
164+
type = 'story'
165+
AND timestamp>'{{ yesterday_ds }}'
166+
AND timestamp<'{{ ds }}'
167+
AND url LIKE '%https://github.com%'
168+
AND url NOT LIKE '%github.com/blog/%'
169+
GROUP BY
170+
date,
171+
submitter,
172+
story_id,
173+
url
174+
''',
175+
destination_dataset_table='{0}.{1}.hackernews_agg${2}'.format(
176+
BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
177+
),
178+
write_disposition='WRITE_TRUNCATE',
179+
allow_large_results=True,
180+
use_legacy_sql=False,
181+
bigquery_conn_id=BQ_CONN_ID,
182+
dag=dag
183+
)
184+
185+
# Task 6
186+
# airflow test bigquery_github_trends_v1 bq_write_to_hackernews_github_agg 2017-06-02
187+
188+
t6 = BigQueryOperator(
189+
task_id='bq_write_to_hackernews_github_agg',
190+
sql='''
191+
#standardSQL
192+
SELECT
193+
a.date as date,
194+
a.url as github_url,
195+
b.repo as github_repo,
196+
a.score as hn_score,
197+
a.story_id as hn_story_id,
198+
b.stars_last_28_days as stars_last_28_days,
199+
b.stars_last_7_days as stars_last_7_days,
200+
b.stars_last_1_day as stars_last_1_day,
201+
b.forks_last_28_days as forks_last_28_days,
202+
b.forks_last_7_days as forks_last_7_days,
203+
b.forks_last_1_day as forks_last_1_day
204+
FROM
205+
(SELECT
206+
*
207+
FROM
208+
`{0}.{1}.hackernews_agg`
209+
WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{2}") AND TIMESTAMP("{2}")
210+
)as a
211+
LEFT JOIN
212+
(
213+
SELECT
214+
repo,
215+
CONCAT('https://github.com/', repo) as url,
216+
stars_last_28_days,
217+
stars_last_7_days,
218+
stars_last_1_day,
219+
forks_last_28_days,
220+
forks_last_7_days,
221+
forks_last_1_day
222+
FROM
223+
`{0}.{1}.github_agg`
224+
WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{2}") AND TIMESTAMP("{2}")
225+
) as b
226+
ON a.url = b.url
227+
'''.format(
228+
BQ_PROJECT, BQ_DATASET, "{{ yesterday_ds }}"
229+
),
230+
destination_dataset_table='{0}.{1}.hackernews_github_agg${2}'.format(
231+
BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds_nodash }}'
232+
),
233+
write_disposition='WRITE_TRUNCATE',
234+
allow_large_results=True,
235+
use_legacy_sql=False,
236+
bigquery_conn_id=BQ_CONN_ID,
237+
dag=dag
238+
)
239+
240+
# Task 7: Check if partition data is written successfully
241+
t7 = BigQueryCheckOperator(
242+
task_id='bq_check_hackernews_github_agg',
243+
sql='''
244+
#standardSQL
245+
SELECT
246+
COUNT(*) AS rows_in_partition
247+
FROM `{0}.{1}.hackernews_github_agg`
248+
WHERE _PARTITIONDATE = "{2}"
249+
'''.format(BQ_PROJECT, BQ_DATASET, '{{ yesterday_ds }}'
250+
),
251+
use_legacy_sql=False,
252+
bigquery_conn_id=BQ_CONN_ID,
253+
dag=dag)
254+
255+
# Setting up Dependencies
256+
t3.set_upstream(t1)
257+
t4.set_upstream(t3)
258+
t5.set_upstream(t2)
259+
t6.set_upstream(t4)
260+
t6.set_upstream(t5)
261+
t7.set_upstream(t6)

0 commit comments

Comments
 (0)