-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path3_hooks_dag.py
40 lines (31 loc) · 1.05 KB
/
3_hooks_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils import dates
import logging
from airflow.hooks.postgres_hook import PostgresHook
default_args = {
"owner":"Tinmar",
"start_date": dates.days_ago(1)
}
def get_pandas():
conn = PostgresHook('redshift_default')
df = conn.get_pandas_df('SELECT * FROM TABLE')
logging.info('Datos obtenidos de la query')
print(df.head())
df.to_csv('s3://bucket/key.csv', index=False)
logging.info('Guardado en S3')
with DAG(
'3_hooks_dag',
catchup=False,
default_args=default_args,
schedule_interval=None,
tags=['Curso 2', 'Apache_Airflow']
) as dag:
start = DummyOperator(task_id='start')
get_pandas_df = PythonOperator(
task_id='get_pandas',
python_callable=get_pandas
)
end = DummyOperator(task_id='end')
start >> get_pandas_df >>end