From 7d436c39a9bcac1f233cf4c01abe8891f9ffb672 Mon Sep 17 00:00:00 2001 From: Eduard Poliakov Date: Thu, 28 Sep 2023 09:15:28 +0300 Subject: [PATCH] add dags download_data and train_val, removed unused paths in preprocess.py --- .gitignore | 4 ++ dags/download_data.py | 34 ++++++++++++ dags/train_val.py | 76 ++++++++++++++++++++++++++ src/scheduler/preprocess/preprocess.py | 2 - 4 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 dags/download_data.py create mode 100644 dags/train_val.py diff --git a/.gitignore b/.gitignore index e31a16d..c762757 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,7 @@ target/ # MLFlow /mlruns/ + +# Data folder +/src/scheduler/download/data +/src/scheduler/preprocess/data diff --git a/dags/download_data.py b/dags/download_data.py new file mode 100644 index 0000000..aaee253 --- /dev/null +++ b/dags/download_data.py @@ -0,0 +1,34 @@ +import os + +import airflow +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.providers.docker.operators.docker import DockerOperator +from docker.types import Mount + + +with DAG( + dag_id="airflow_download_data_from_s3", + start_date=airflow.utils.dates.days_ago(5), + schedule_interval="@daily", +) as dag: + + get_data = DockerOperator( + image="download", + command="--s3-bucket sem5-airflow --remote-path remote_tests/{{ ds }} --output-path data/raw/{{ ds }}", + task_id="download", + do_xcom_push=False, + mounts=[ + Mount( + source=f"{os.environ['DATA_VOLUME_PATH']}/data", + target="/data", + type="bind", + ) + ], + ) + + notify = BashOperator( + task_id="notify", bash_command=f"echo new rows of data generated ...", + ) + + get_data >> notify diff --git a/dags/train_val.py b/dags/train_val.py new file mode 100644 index 0000000..3bd0362 --- /dev/null +++ b/dags/train_val.py @@ -0,0 +1,76 @@ +import os + +import airflow +from airflow import DAG +from airflow.sensors.filesystem import FileSensor +from airflow.operators.bash import BashOperator +from airflow.providers.docker.operators.docker import DockerOperator +from docker.types import Mount + + +with DAG( + dag_id="airflow_train_val", + start_date=airflow.utils.dates.days_ago(1), + schedule_interval="@daily", +) as dag: + wait_for_data = FileSensor( + task_id="wait-for-data", + poke_interval=5, + retries=5, + filepath="data/raw/{{ ds }}/demand_orders.csv", + ) + + wait_for_another_table = FileSensor( + task_id="wait-for-another-table", + poke_interval=5, + retries=5, + filepath="data/raw/{{ ds }}/demand_orders_status.csv", + ) + + preprocess = DockerOperator( + image="preprocess", + command="--input-dir /data/raw/{{ ds }} --output-dir /data/processed/{{ ds }} --config configs/train_config.yaml", + task_id="preprocess", + do_xcom_push=False, + mounts=[ + Mount( + source=f"{os.environ['DATA_VOLUME_PATH']}/data", + target="/data", + type="bind", + ) + ], + ) + + split = DockerOperator( + image="split", + command="--input-dir /data/processed/{{ ds }} --output-dir /data/processed/{{ ds }} --test-size 0.2", + task_id="split", + do_xcom_push=False, + mounts=[ + Mount( + source=f"{os.environ['DATA_VOLUME_PATH']}/data", + target="/data", + type="bind", + ) + ], + ) + + train = DockerOperator( + image="train", + command="--input-dir /data/processed/{{ ds }} --output-dir /data/models/{{ ds }} --config configs/train_config.yaml", + task_id="train", + do_xcom_push=False, + mounts=[ + Mount( + source=f"{os.environ['DATA_VOLUME_PATH']}/data", + target="/data", + type="bind", + ) + ], + ) + + notify = BashOperator( + task_id="notify", bash_command=f'echo "Model train and validated ... "', + ) + + wait_for_data >> preprocess >> split >> train >> notify diff --git a/src/scheduler/preprocess/preprocess.py b/src/scheduler/preprocess/preprocess.py index 6af42ce..943d35e 100644 --- a/src/scheduler/preprocess/preprocess.py +++ b/src/scheduler/preprocess/preprocess.py @@ -37,8 +37,6 @@ def preprocess(input_dir: str, output_dir: str, config: str): logger.info(f"params: {training_pipeline_params}") demand_orders = pd.read_csv(os.path.join(input_dir, "demand_orders.csv")) demand_orders_status = pd.read_csv(os.path.join(input_dir, "demand_orders_status.csv")) - # demand_orders = pd.read_csv(training_pipeline_params.input_demand_orders) - # demand_orders_status = pd.read_csv(training_pipeline_params.input_demand_orders_status) # Make sku_demand_by_day logger.info("Start training...") logger.info("Make sku demand day...")