1
1
import json
2
2
import pathlib
3
+ from datetime import datetime , timedelta
4
+
3
5
4
6
import airflow
5
7
import requests
12
14
import sys
13
15
import os
14
16
15
- dag = DAG (
16
- dag_id = "Load_traffic_data" ,
17
- start_date = airflow .utils .dates .days_ago (1 ),
17
+ default_args = {
18
+ 'depends_on_past' : False ,
19
+ 'email' : ['airflow@example.com' ],
20
+ 'email_on_failure' : False ,
21
+ 'email_on_retry' : False ,
22
+ 'retries' : 1 ,
23
+ 'retry_delay' : timedelta (minutes = 5 )
24
+ }
25
+
26
+ with DAG (
27
+ 'Load_traffic_data' ,
28
+ default_args = default_args ,
29
+ start_date = datetime (2022 , 8 , 20 ),
18
30
schedule_interval = None ,
19
31
catchup = False ,
20
- )
32
+ ) as dag :
21
33
22
- create_traffic_table = PostgresOperator (
23
- task_id = 'Create_table'
24
- postgres_conn_id = " postgres_connect" ,
25
- sql = "sql/create_table.sql" ,
26
- )
34
+ create_traffic_table = PostgresOperator (
35
+ task_id = 'Create_table' ,
36
+ postgres_conn_id = ' postgres_connect' ,
37
+ sql = "sql/create_table.sql" ,
38
+ )
27
39
28
- load_traffic_table = PostgresOperator (
29
- task_id = 'Load_table'
30
- postgres_conn_id = " postgres_connect" ,
31
- sql = "sql/create_table.sql" ,
32
- )
40
+ load_traffic_table = PostgresOperator (
41
+ task_id = 'Load_table' ,
42
+ postgres_conn_id = ' postgres_connect' ,
43
+ sql = "sql/create_table.sql" ,
44
+ )
33
45
34
46
create_traffic_table >> load_traffic_table
0 commit comments