Skip to content

Commit

Permalink
add dags download_data and train_val, removed unused paths in preproc…
Browse files Browse the repository at this point in the history
…ess.py
  • Loading branch information
Edipool committed Sep 28, 2023
1 parent 9f7d8fd commit 7d436c3
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ target/

# MLFlow
/mlruns/

# Data folder
/src/scheduler/download/data
/src/scheduler/preprocess/data
34 changes: 34 additions & 0 deletions dags/download_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount


with DAG(
dag_id="airflow_download_data_from_s3",
start_date=airflow.utils.dates.days_ago(5),
schedule_interval="@daily",
) as dag:

get_data = DockerOperator(
image="download",
command="--s3-bucket sem5-airflow --remote-path remote_tests/{{ ds }} --output-path data/raw/{{ ds }}",
task_id="download",
do_xcom_push=False,
mounts=[
Mount(
source=f"{os.environ['DATA_VOLUME_PATH']}/data",
target="/data",
type="bind",
)
],
)

notify = BashOperator(
task_id="notify", bash_command=f"echo new rows of data generated ...",
)

get_data >> notify
76 changes: 76 additions & 0 deletions dags/train_val.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os

import airflow
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount


with DAG(
dag_id="airflow_train_val",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@daily",
) as dag:
wait_for_data = FileSensor(
task_id="wait-for-data",
poke_interval=5,
retries=5,
filepath="data/raw/{{ ds }}/demand_orders.csv",
)

wait_for_another_table = FileSensor(
task_id="wait-for-another-table",
poke_interval=5,
retries=5,
filepath="data/raw/{{ ds }}/demand_orders_status.csv",
)

preprocess = DockerOperator(
image="preprocess",
command="--input-dir /data/raw/{{ ds }} --output-dir /data/processed/{{ ds }} --config configs/train_config.yaml",
task_id="preprocess",
do_xcom_push=False,
mounts=[
Mount(
source=f"{os.environ['DATA_VOLUME_PATH']}/data",
target="/data",
type="bind",
)
],
)

split = DockerOperator(
image="split",
command="--input-dir /data/processed/{{ ds }} --output-dir /data/processed/{{ ds }} --test-size 0.2",
task_id="split",
do_xcom_push=False,
mounts=[
Mount(
source=f"{os.environ['DATA_VOLUME_PATH']}/data",
target="/data",
type="bind",
)
],
)

train = DockerOperator(
image="train",
command="--input-dir /data/processed/{{ ds }} --output-dir /data/models/{{ ds }} --config configs/train_config.yaml",
task_id="train",
do_xcom_push=False,
mounts=[
Mount(
source=f"{os.environ['DATA_VOLUME_PATH']}/data",
target="/data",
type="bind",
)
],
)

notify = BashOperator(
task_id="notify", bash_command=f'echo "Model train and validated ... "',
)

wait_for_data >> preprocess >> split >> train >> notify
2 changes: 0 additions & 2 deletions src/scheduler/preprocess/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ def preprocess(input_dir: str, output_dir: str, config: str):
logger.info(f"params: {training_pipeline_params}")
demand_orders = pd.read_csv(os.path.join(input_dir, "demand_orders.csv"))
demand_orders_status = pd.read_csv(os.path.join(input_dir, "demand_orders_status.csv"))
# demand_orders = pd.read_csv(training_pipeline_params.input_demand_orders)
# demand_orders_status = pd.read_csv(training_pipeline_params.input_demand_orders_status)
# Make sku_demand_by_day
logger.info("Start training...")
logger.info("Make sku demand day...")
Expand Down

0 comments on commit 7d436c3

Please sign in to comment.