Skip to content

Commit 61a5d43

Browse files
authored
Dag file
1 parent 988702c commit 61a5d43

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from datetime import datetime, timedelta
2+
import os
3+
from airflow import DAG
4+
from airflow.operators.dummy_operator import DummyOperator
5+
from airflow.operators import (StageToRedshiftOperator, LoadFactOperator,
6+
LoadDimensionOperator, DataQualityOperator)
7+
from helpers import SqlQueries as sqlq
8+
9+
# AWS_KEY = os.environ.get('AWS_KEY')
10+
# AWS_SECRET = os.environ.get('AWS_SECRET')
11+
create_tables_qry=''
12+
with open('/home/workspace/airflow/create_tables.sql', 'r') as file:
13+
create_tables_qry = file.read().replace('\n', '')
14+
default_args = {
15+
'owner': 'udacity',
16+
'start_date': datetime(2019, 1, 12),
17+
'depends_on_past': False,
18+
'retries': 3,
19+
'retry_delay': timedelta(minutes=5),
20+
'email_on_retry': False,
21+
}
22+
23+
dag = DAG('udac_example_dag',
24+
default_args=default_args,
25+
description='Load and transform data in Redshift with Airflow',
26+
schedule_interval='0 * * * *',
27+
catchup=False,
28+
)
29+
30+
start_operator = DummyOperator(task_id='Begin_execution', dag=dag)
31+
32+
create_tables = LoadDimensionOperator(
33+
task_id='Create_tables',
34+
dag=dag,
35+
conn_id='postgres_default',
36+
queries_list=create_tables_qry
37+
)
38+
stage_events_to_redshift = StageToRedshiftOperator(
39+
task_id='Stage_events',
40+
dag=dag,
41+
conn_id='postgres_default',
42+
s3_src_path='s3://udacity-dend/log_data',
43+
table='staging_events',
44+
iam_role='',
45+
json_type='s3://udacity-dend/log_json_path.json',
46+
region='us-west-2',
47+
extra_params="timeformat as 'epochmillisecs'"
48+
)
49+
50+
stage_songs_to_redshift = StageToRedshiftOperator(
51+
task_id='Stage_songs',
52+
dag=dag,
53+
conn_id='postgres_default',
54+
s3_src_path='s3://udacity-dend/song_data',
55+
table='staging_songs',
56+
iam_role='',
57+
json_type='auto',
58+
region='us-west-2',
59+
extra_params=''
60+
)
61+
62+
load_songplays_table = LoadFactOperator(
63+
task_id='Load_songplays_fact_table',
64+
dag=dag,
65+
conn_id='postgres_default',
66+
query=sqlq.songplay_table_insert
67+
)
68+
69+
load_user_dimension_table = LoadDimensionOperator(
70+
task_id='Load_user_dim_table',
71+
dag=dag,
72+
conn_id='postgres_default',
73+
queries_list=sqlq.user_table_insert
74+
)
75+
76+
load_song_dimension_table = LoadDimensionOperator(
77+
task_id='Load_song_dim_table',
78+
dag=dag,
79+
conn_id='postgres_default',
80+
query=sqlq.song_table_insert
81+
)
82+
83+
load_artist_dimension_table = LoadDimensionOperator(
84+
task_id='Load_artist_dim_table',
85+
dag=dag,
86+
conn_id='postgres_default',
87+
query=sqlq.artist_table_insert
88+
)
89+
90+
load_time_dimension_table = LoadDimensionOperator(
91+
task_id='Load_time_dim_table',
92+
dag=dag,
93+
conn_id='postgres_default',
94+
query=sqlq.time_table_insert
95+
)
96+
97+
run_quality_checks = DataQualityOperator(
98+
task_id='Run_data_quality_checks',
99+
dag=dag,
100+
conn_id='postgres_default'
101+
102+
103+
)
104+
105+
end_operator = DummyOperator(task_id='Stop_execution', dag=dag)
106+
107+
start_operator >> create_tables >> [stage_events_to_redshift,stage_songs_to_redshift] >> load_songplays_table
108+
load_songplays_table >> [load_song_dimension_table,load_user_dimension_table,load_artist_dimension_table,load_time_dimension_table] >> run_quality_checks
109+
run_quality_checks >>end_operator

0 commit comments

Comments
 (0)