Skip to content

Commit 9784664

Browse files
authored
Merge pull request tuanavu#12 from tuanavu/development
Airflow variables tutorial
2 parents 86da3c9 + d5eac3a commit 9784664

File tree

6 files changed

+99
-6
lines changed

6 files changed

+99
-6
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ Check http://localhost:8080/
5050

5151
## Other commands
5252

53-
If you want to run other airflow sub-commands, you can do so like this:
53+
If you want to run airflow sub-commands, you can do so like this:
5454

5555
- `docker-compose run --rm webserver airflow list_dags` - List dags
5656
- `docker-compose run --rm webserver airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]` - Test specific task
5757

58+
If you want to run/test python script, you can do so like this:
59+
- `docker-compose run --rm webserver python /usr/local/airflow/dags/[PYTHON-FILE].py` - Test python script
60+
5861
## Connect to database
5962

6063
If you want to use Ad hoc query, make sure you've configured connections:

examples/gcloud-example/dags/bigquery_github/bigquery_github_trends.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,17 @@
22
from datetime import timedelta, datetime
33

44
from airflow import DAG
5+
from airflow.models import Variable
56
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
67
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
78

9+
10+
# Config variables
11+
dag_config = Variable.get("bigquery_github_trends_variables", deserialize_json=True)
12+
BQ_CONN_ID = dag_config["bq_conn_id"]
13+
BQ_PROJECT = dag_config["bq_project"]
14+
BQ_DATASET = dag_config["bq_dataset"]
15+
816
default_args = {
917
'owner': 'airflow',
1018
'depends_on_past': True,
@@ -28,11 +36,6 @@
2836
schedule_interval=schedule_interval
2937
)
3038

31-
# Config variables
32-
BQ_CONN_ID = "my_gcp_conn"
33-
BQ_PROJECT = "my-bq-project"
34-
BQ_DATASET = "my-bq-dataset"
35-
3639
## Task 1: check that the github archive data has a dated table created for that date
3740
# To test this task, run this command:
3841
# docker-compose -f docker-compose-gcloud.yml run --rm webserver airflow test bigquery_github_trends bq_check_githubarchive_day 2018-12-01
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"bigquery_github_trends_variables": {
3+
"bq_conn_id": "my_gcp_conn",
4+
"bq_project": "my_bq_project",
5+
"bq_dataset": "my_bq_dataset"
6+
}
7+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Ignore everything in this directory
2+
*
3+
# Except this file
4+
!.gitignore
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"example_variables_config": {
3+
"var1": "value1",
4+
"var2": [1, 2, 3],
5+
"var3": {
6+
"k": "value3"
7+
}
8+
}
9+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from __future__ import print_function
2+
3+
from datetime import datetime
4+
5+
from airflow import DAG
6+
from airflow.models import Variable
7+
from airflow.operators.dummy_operator import DummyOperator
8+
from airflow.operators.bash_operator import BashOperator
9+
10+
default_args = {
11+
'owner': 'airflow',
12+
'start_date': datetime(2019, 2, 15),
13+
'end_date': datetime(2019, 2, 15)
14+
}
15+
16+
dag = DAG('example_variables',
17+
schedule_interval="@once",
18+
default_args=default_args)
19+
20+
21+
# Config variables
22+
## Common
23+
# var1 = "value1"
24+
# var2 = [1, 2, 3]
25+
# var3 = {'k': 'value3'}
26+
27+
## 3 DB connections called
28+
# var1 = Variable.get("var1")
29+
# var2 = Variable.get("var2")
30+
# var3 = Variable.get("var3")
31+
32+
## Recommended way
33+
dag_config = Variable.get("example_variables_config", deserialize_json=True)
34+
var1 = dag_config["var1"]
35+
var2 = dag_config["var2"]
36+
var3 = dag_config["var3"]
37+
38+
start = DummyOperator(
39+
task_id="start",
40+
dag=dag
41+
)
42+
43+
# To test this task, run this command:
44+
# docker-compose run --rm webserver airflow test example_variables get_dag_config 2019-02-15
45+
t1 = BashOperator(
46+
task_id="get_dag_config",
47+
bash_command='echo "{0}"'.format(dag_config),
48+
dag=dag,
49+
)
50+
51+
# You can directly use a variable from a jinja template
52+
## {{ var.value.<variable_name> }}
53+
54+
t2 = BashOperator(
55+
task_id="get_variable_value",
56+
bash_command='echo {{ var.value.var3 }} ',
57+
dag=dag,
58+
)
59+
60+
## {{ var.json.<variable_name> }}
61+
t3 = BashOperator(
62+
task_id="get_variable_json",
63+
bash_command='echo {{ var.json.example_variables_config.var3 }} ',
64+
dag=dag,
65+
)
66+
67+
start >> [t1, t2, t3]

0 commit comments

Comments
 (0)