Skip to content

Commit b8b85df

Browse files
Update dag
1 parent 744abb9 commit b8b85df

File tree

2 files changed

+64
-39
lines changed

2 files changed

+64
-39
lines changed
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import json
2+
import pathlib
3+
4+
import airflow
5+
import requests
6+
import requests.exceptions as requests_exceptions
7+
from airflow import DAG
8+
from airflow.operators.bash import BashOperator
9+
from airflow.operators.python import PythonOperator
10+
11+
import sys
12+
import os
13+
14+
sys.path.insert(0, '/home/wacira/10 Academy/week 11/repository/traffic_data_etl/scripts')
15+
16+
dag = DAG(
17+
dag_id="Try_dag_out",
18+
start_date=airflow.utils.dates.days_ago(14),
19+
schedule_interval=None,
20+
)
21+
22+
def say_hello():
23+
return 'HellO Titus'
24+
25+
say_hello = PythonOperator(
26+
task_id="Hello_to_me",
27+
python_callable = say_hello,
28+
dag=dag,
29+
)
30+
31+
say_hello

airflow/dags/workflow.py

+33-39
Original file line numberDiff line numberDiff line change
@@ -13,54 +13,48 @@
1313

1414
sys.path.insert(0, '/home/wacira/10 Academy/week 11/repository/traffic_data_etl/scripts')
1515

16-
dag = DAG(
17-
dag_id="Try_dag_out",
18-
start_date=airflow.utils.dates.days_ago(14),
19-
schedule_interval=None,
20-
)
2116

22-
def hello_world():
23-
return 'Hellow Airflow'
17+
import load_data
18+
import extract_data
19+
from load_data import LoadToDB
20+
from extract_data import ExtractCSV
2421

25-
say_hello = PythonOperator(
26-
task_id="say_hellowz",
27-
python_callable = hello_world,
28-
dag=dag,
29-
)
30-
31-
say_hello
32-
3322

34-
# import load_data
35-
# import extract_data
36-
# from load_data import LoadToDB
37-
# from extract_data import ExtractCSV
23+
args={
24+
'depends_on_past': False,
25+
'email': ['airflow@example.com'],
26+
'email_on_failure': False,
27+
'email_on_retry': False,
28+
'retries': 1,
29+
'retry_delay': timedelta(minutes=5)
30+
}
3831

39-
# def load_restructure():
40-
# return LoadToDB.load_to_db()
32+
def load_restructure():
33+
return LoadToDB.load_to_db()
4134

42-
# def extract():
43-
# return ExtractCSV.load_and_restructure()
35+
def extract():
36+
return ExtractCSV.load_and_restructure()
4437

45-
# dag = DAG(
46-
# dag_id="Load_Data_files",
47-
# start_date=airflow.utils.dates.days_ago(14),
48-
# schedule_interval=None,
49-
# )
38+
dag = DAG(
39+
dag_id="ELT_orchestration",
40+
default_args = args,
41+
start_date=airflow.utils.dates.days_ago(14),
42+
schedule_interval=None,
43+
)
5044

51-
# extract_data = PythonOperator(
52-
# task_id="download_launches",
53-
# python_callable = extract,
54-
# dag=dag,
55-
# )
45+
extract_data = PythonOperator(
46+
task_id="download_launches",
47+
python_callable = extract,
48+
dag=dag,
49+
)
5650

5751

58-
# load_data = PythonOperator(
59-
# task_id="load_data",
60-
# python_callable= load_restructure,
61-
# dag=dag,
62-
# )
52+
load_data = PythonOperator(
53+
task_id="load_data",
54+
python_callable= load_restructure,
55+
dag=dag,
56+
)
6357

6458

65-
# extract_data >> load_data
59+
extract_data >> load_data
6660

0 commit comments

Comments
 (0)