Skip to content

Commit

Permalink
include datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
edulauer committed Oct 3, 2024
1 parent c0bb345 commit 8607718
Showing 1 changed file with 49 additions and 2 deletions.
51 changes: 49 additions & 2 deletions dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@

from airflow import Dataset
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable
from airflow.providers.common.sql.operators.sql import SQLCheckOperator
from airflow.operators.python import BranchPythonOperator

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

# Constants

DEST_DIR = "download_inlabs"
#XXX update here
DEST_CONN_ID = "database_to_load_inlabs_data"
DEST_CONN_ID = "inlabs_db"
#XXX connection to https://inlabs.in.gov.br/
INLABS_CONN_ID = "inlabs_portal"
#XXX remember to create schema `dou_inlabs` on db
Expand All @@ -41,7 +43,7 @@
@dag(
dag_id="ro-dou_inlabs_load_pg",
default_args=default_args,
schedule="59 3,23 * * *",
schedule="00 15,23 * * *",
catchup=False,
description=__doc__,
max_active_runs=1,
Expand Down Expand Up @@ -213,16 +215,61 @@ def _clean_db(hook: PostgresHook):
outlets=[Dataset("inlabs")]
)

@task.branch
def check_if_first_run_of_day():
context = get_current_context()
execution_date = context['logical_date']
prev_execution_date = context['prev_execution_date']
logging.info("Execution_date: %s", execution_date)
logging.info("Prev_execution_date: %s", prev_execution_date)

if execution_date.day == prev_execution_date.day:
logging.info ("Não é a primeira execução do dia")
logging.info ("Triggering dataset edicao_extra")
return "trigger_dataset_edicao_extra"
else:
logging.info ("Primeira execução do dia")
logging.info ("Triggering dataset e DAGs do INLABS")
return "trigger_dataset_inlabs"


@task(outlets=[Dataset("inlabs_edicao_extra")])
def trigger_dataset_inlabs_edicao_extra():
pass

@task(outlets=[Dataset("inlabs")])
def trigger_dataset_inlabs():
pass


@task
def remove_directory():
dest_path = os.path.join(Variable.get("path_tmp"), DEST_DIR)
subprocess.run(f"rm -rf {dest_path}", shell=True, check=True)
logging.info("Directory %s removed.", dest_path)

# @task_group(group_id='datasets')
# def trigger_datasets():
# @task.run_if(lambda context: context["task_instance"].execution_date.hour == 15)
# @task(outlets=[Dataset("inlabs")])
# def trigger_dataset_edicao_normal():
# logging.info("Disparando DAGs do INLABS")

# @task.run_if(lambda context: context["task_instance"].execution_date.hour > 15)
# @task(outlets=[Dataset("inlabs_edicao_extra")])
# def trigger_dataset_edicao_extra(**kwargs):
# logging.info(context["task_instance"])
# logging.info("Atualizando o Dataset de Edição Extra")

# trigger_dataset_edicao_normal(), trigger_dataset_edicao_extra()


## Orchestration
trigger_date = get_date()
download_n_unzip_files(trigger_date) >> \
load_data(trigger_date) >> check_loaded_data >> \
check_if_first_run_of_day() >> \
[trigger_dataset_inlabs_edicao_extra(),trigger_dataset_inlabs()] >> \
remove_directory()


Expand Down

0 comments on commit 8607718

Please sign in to comment.