Skip to content

Commit cd77db1

Browse files
Merge pull request #15 from ProgrammingOperative/orchestration
Orchestration
2 parents dbeb362 + 7ab4689 commit cd77db1

20 files changed

+172
-159
lines changed

,

Whitespace-only changes.
+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import json
2+
import pathlib
3+
from datetime import datetime, timedelta
4+
import os, sys
5+
import pandas as pd
6+
7+
import airflow
8+
import requests
9+
import requests.exceptions as requests_exceptions
10+
from airflow import DAG
11+
12+
from airflow.operators.bash import BashOperator
13+
from airflow.operators.python import PythonOperator
14+
from airflow.providers.postgres.operators.postgres import PostgresOperator
15+
16+
import sys
17+
import os
18+
19+
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
20+
sys.path.append(os.path.dirname(SCRIPT_DIR))
21+
22+
DBT_PROJECT_DIR = "~/dbt"
23+
24+
from postgres_dags.extract_data import ExtractCSV
25+
26+
extract_it = ExtractCSV()
27+
28+
default_args = {
29+
"owner": "ProgrammingOperative",
30+
'depends_on_past': False,
31+
'email': ['wachura11t@gmail.com'],
32+
'email_on_failure': False,
33+
'email_on_retry': False,
34+
'retries': 1,
35+
'retry_delay': timedelta(minutes=5)
36+
}
37+
38+
# /home/wacira/10Academy/ETL_week11/repository/traffic_data_etl/dbt
39+
def extract():
40+
data = extract_it.load_csv("~/data/warehousedata.csv")
41+
restructured_df = extract_it.restructure(data)
42+
path = '~/data/cleaned.csv'
43+
restructured_df.to_csv(path)
44+
45+
46+
with DAG(
47+
'Load_traffic_data',
48+
default_args = default_args,
49+
start_date=datetime(2022, 8, 22),
50+
schedule_interval=None,
51+
catchup=False,
52+
) as dag:
53+
54+
extract_data = PythonOperator(
55+
task_id = 'extract_data',
56+
python_callable=extract
57+
)
58+
59+
create_traffic_table = PostgresOperator(
60+
task_id = 'Create_table',
61+
postgres_conn_id='postgres_connect',
62+
sql="sql/create_table.sql",
63+
)
64+
65+
load_traffic_table = PostgresOperator(
66+
task_id = 'Load_table',
67+
postgres_conn_id='postgres_connect',
68+
sql="sql/create_table.sql",
69+
)
70+
71+
transform = BashOperator(
72+
task_id = 'dbt_transformation',
73+
bash_command='cd ~/10Academy/ETL_week11/repository/traffic_data_etl/dbt && dbt run',
74+
)
75+
76+
77+
78+
extract_data >> create_traffic_table >> load_traffic_table >> transform
+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import pandas as pd
2+
3+
import os, sys
4+
5+
class ExtractCSV:
6+
def __init__(self) -> None:
7+
pass
8+
9+
def load_csv(self, path):
10+
data_frame = pd.read_csv(path)
11+
return data_frame
12+
13+
def restructure(self, data):
14+
# data = pd.read_csv("~/10 Academy/week 11/repository/traffic_data_etl/data/warehousedata.csv")
15+
cols = data.columns[0].split(";")
16+
17+
# Strip the data of empty spaces
18+
for i in range(len(cols)):
19+
cols[i] = cols[i].strip()
20+
21+
# Start restructuring by including records with only the columns stipulated
22+
track_ids = []
23+
types = []
24+
traveled_d = []
25+
avg_speeds = []
26+
lat = []
27+
lon = []
28+
speed = []
29+
lon_acc = []
30+
lat_acc = []
31+
time = []
32+
33+
for r in range(len(data)):
34+
row = data.iloc[r,:][0].split(";")
35+
row_p1 = row[:]
36+
track_ids.append(row_p1[0])
37+
types.append(row_p1[1])
38+
traveled_d.append(row_p1[2])
39+
avg_speeds.append(row_p1[3])
40+
lat.append(row_p1[4])
41+
lon.append(row_p1[5])
42+
speed.append(row_p1[6])
43+
lon_acc.append(row_p1[7])
44+
lat_acc.append(row_p1[8])
45+
time.append(row_p1[9])
46+
47+
#Create a dictionary first before creation of pandas dataframe
48+
data_dict = {cols[0]:track_ids, cols[1]:types, cols[2]:traveled_d, cols[3]:avg_speeds, cols[4]:lat, cols[5]:lon, cols[6]:speed, cols[7]:lon_acc, cols[8]:lat_acc, cols[9]:time}
49+
50+
#Create dataframe
51+
extract_df = pd.DataFrame(data_dict)
52+
53+
54+
#Final output
55+
print("-----------------Successfully Extracted data---------------------")
56+
return extract_df
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
create table if not exists traffic_table
2+
(
3+
id BIGSERIAL PRIMARY KEY,
4+
track_id INT NOT NULL,
5+
vehicle_type VARCHAR(500) NOT NULL,
6+
traveled_d VARCHAR(500) NOT NULL,
7+
avg_speed FLOAT NOT NULL,
8+
lat FLOAT NOT NULL,
9+
lon FLOAT NOT NULL,
10+
speed FLOAT NOT NULL,
11+
lon_acc FLOAT NOT NULL,
12+
lat_acc FLOAT NOT NULL,
13+
record_time FLOAT NOT NULL
14+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
COPY traffic_table(id, track_id, vehicle_type, traveled_d, avg_speed, lat, lon, speed, lon_acc, lat_acc, record_time)
2+
FROM '../../../../data/clean_data.csv'
3+
DELIMITER ','
4+
CSV HEADER;
5+

airflow/dags/rough.py

-14
This file was deleted.

airflow/dags/workflow.py

-47
This file was deleted.

notebooks/load_data.ipynb

+19
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,25 @@
538538
"\n",
539539
"\n"
540540
]
541+
},
542+
{
543+
"cell_type": "code",
544+
"execution_count": 1,
545+
"metadata": {},
546+
"outputs": [
547+
{
548+
"name": "stdout",
549+
"output_type": "stream",
550+
"text": [
551+
"['/home/wacira/10Academy/Traffic Data ETL week11/repository/traffic_data_etl/notebooks', '/home/wacira/.vscode/extensions/ms-toolsai.jupyter-2022.7.1102252217/pythonFiles', '/home/wacira/.vscode/extensions/ms-toolsai.jupyter-2022.7.1102252217/pythonFiles/lib/python', '/home/wacira/anaconda3/envs/data_engineering/lib/python310.zip', '/home/wacira/anaconda3/envs/data_engineering/lib/python3.10', '/home/wacira/anaconda3/envs/data_engineering/lib/python3.10/lib-dynload', '', '/home/wacira/.local/lib/python3.10/site-packages', '/home/wacira/anaconda3/envs/data_engineering/lib/python3.10/site-packages', '/home/wacira/anaconda3/envs/data_engineering/lib/python3.10/site-packages/GDAL-3.5.0-py3.10-linux-x86_64.egg']\n"
552+
]
553+
}
554+
],
555+
"source": [
556+
"import sys\n",
557+
"\n",
558+
"print(sys.path)"
559+
]
541560
}
542561
],
543562
"metadata": {

screenshots/airflow/dag graph.png

110 KB
Loading
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

scripts/load_data_alternative.py

-67
This file was deleted.

scripts/traffic_flow_dag.py

-31
This file was deleted.

0 commit comments

Comments
 (0)