Skip to content

Commit

Permalink
Changes to demos
Browse files Browse the repository at this point in the history
  • Loading branch information
virajmparekh committed Jun 23, 2019
1 parent 7e447e5 commit 5a9a8d7
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 26 deletions.
91 changes: 91 additions & 0 deletions dags/example-dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('example_dag',
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args)

t1 = BashOperator(
task_id='print_date1',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t2 = BashOperator(
task_id='print_date2',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t3 = BashOperator(
task_id='print_date3',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t4 = BashOperator(
task_id='print_date4',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t5 = BashOperator(
task_id='print_date5',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t6 = BashOperator(
task_id='print_date6',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t7 = BashOperator(
task_id='print_date7',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t8 = BashOperator(
task_id='print_date8',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t3)
t6.set_upstream(t3)
t7.set_upstream(t3)
t8.set_upstream(t3)

t9 = BashOperator(
task_id='print_date9',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t10 = BashOperator(
task_id='print_date10',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t11 = BashOperator(
task_id='print_date11',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t12 = BashOperator(
task_id='print_date12',
bash_command='sleep $[ ( $RANDOM % 30 ) + 1 ]s',
dag=dag)

t9.set_upstream(t8)
t10.set_upstream(t8)
t11.set_upstream(t8)
t12.set_upstream(t8)
95 changes: 77 additions & 18 deletions dags/example_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,94 @@
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.models import Variable
from airflow.operators.slack_operator import SlackAPIPostOperator

import json


jobs = json.loads(Variable.get('jobs'))
jobs = [{
"source": "source_A",
"destination": "destination_A",
"image": "image_A"
}, {
"source": "source_B",
"destination": "destination_B",
"image": "image_B"
}, {
"source": "source_C",
"destination": "destination_C",
"image": "image_A"
}, {
"source": "source_C",
"destination": "destination_C",
"image": "image_C"
}, {
"source": "source_D",
"destination": "destination_D",
"image": "image_D"
}, {
"source": "source_E",
"destination": "destination_E",
"image": "image_E"
}, {
"source": "source_F",
"destination": "destination_F",
"image": "image_F"
}, {
"source": "source_G",
"destination": "destination_G",
"image": "image_G"
}, {
"source": "source_H",
"destination": "destination_H",
"image": "image_H"
}, {
"source": "source_I",
"destination": "destination_I",
"image": "image_I"
}, {
"source": "source_J",
"destination": "destination_J",
"image": "image_J"
}, {
"source": "source_K",
"destination": "destination_K",
"image": "image_K"
}]


def create_dag(dag_id,
schedule,
default_args):

def hello_world_py(*args):
print('Hello World')
default_args,
job):

dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
default_args=default_args,
catchup=False,
max_active_runs=1)

with dag:
for job in jobs:
t1 = ExternalTaskSensor(task_id='external_sensor_{0}'
.format(job['source']),
external_dag_id='etl_dag',
external_task_id='talend_etl_{0}'
.format(job['source']))
for i in range(0, 10):
t2 = PythonOperator(task_id='python_code_{0}'.format(i),
python_callable=hello_world_py)

t1 >> t2
start = DummyOperator(task_id='start')
t1 = ExternalTaskSensor(task_id='wait_for_talend_{0}'
.format(job['source']),
external_dag_id='etl_dag',
external_task_id='talend_etl_{0}'
.format(job['source']))
t2 = SlackAPIPostOperator(task_id='post_slack_{0}'.format(job['source']),
username='ETL',
slack_conn_id='slack_conn',
text="My job {0} finished".format(
job['source']),
channel='biz-cloud-billing')

start >> t1 >> t2

if job['source'] == 'source_B':
for i in range(0, 15):
m = DummyOperator(task_id='TeamB_alerting_logic_{0}'.format(i))

t2 >> m

return dag

Expand All @@ -51,4 +109,5 @@ def hello_world_py(*args):

globals()[dag_id] = create_dag(dag_id,
schedule,
default_args)
default_args,
job)
90 changes: 82 additions & 8 deletions dags/kube_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.slack_operator import SlackAPIPostOperator

from airflow.operators.dummy_operator import DummyOperator
import json
import os

NAMESPACE = os.getenv('HOSTNAME').split('-schedu')[0]
default_args = {
'owner': 'airflow',
'depends_on_past': False,
Expand All @@ -15,24 +19,94 @@
'retry_delay': timedelta(minutes=5),
}

dag = DAG('etl_dag',
schedule_interval='@once',
dag = DAG(dag_id='etl_dag',
schedule_interval='@daily',
catchup=False,
default_args=default_args)

jobs = json.loads(Variable.get('jobs'))
# jobs = Variable.get("jobs", deserialize_json=True)
# set deseairlize to true here.

jobs = [{
"source": "source_A",
"destination": "destination_A",
"image": "image_A"
}, {
"source": "source_B",
"destination": "destination_B",
"image": "image_B"
}, {
"source": "source_C",
"destination": "destination_C",
"image": "image_A"
}, {
"source": "source_C",
"destination": "destination_C",
"image": "image_C"
}, {
"source": "source_D",
"destination": "destination_D",
"image": "image_D"
}, {
"source": "source_E",
"destination": "destination_E",
"image": "image_E"
}, {
"source": "source_F",
"destination": "destination_F",
"image": "image_F"
}, {
"source": "source_G",
"destination": "destination_G",
"image": "image_G"
}, {
"source": "source_H",
"destination": "destination_H",
"image": "image_H"
}, {
"source": "source_I",
"destination": "destination_I",
"image": "image_I"
}, {
"source": "source_J",
"destination": "destination_J",
"image": "image_J"
}, {
"source": "source_K",
"destination": "destination_K",
"image": "image_K"
}]
with dag:

start = DummyOperator(task_id="start")

for job in jobs:
k = KubernetesPodOperator(
task_id="talend_etl_{0}".format(job['source']),
namespace='',
image="talend_job_{0}".format(job['image']),
labels={"foo": "bar"},
name="talend_etl_{0}".format(job['source']),
namespace='datarouter-glowing-lens-5847',
name='datarouter-glowing-lens-5847',
image="alpine:latest",
in_cluster=True,
service_account='glowing-lens-5847-worker-serviceaccount',
get_logs=True)

start >> k
t2 = SlackAPIPostOperator(task_id='post_slack_{0}'.format(job['source']),
username='ETL',
slack_conn_id='slack_conn',
text="My job {0} finished".format(
job['source']),
channel='biz-cloud-billing')

if job['source'] == 'source_B':
for i in range(0, 6):
m = DummyOperator(task_id='TeamB_alerting_logic_{0}'.format(i))

t2 >> m

if job['source'] == 'source_K':
for i in range(0, 3):
m = DummyOperator(task_id='Spark_Jobs_{0}'.format(i))

t2 >> m

start >> k >> t2
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
noop
slackclient<2.0.0

0 comments on commit 5a9a8d7

Please sign in to comment.