Skip to content

Commit

Permalink
Merge pull request #3 from astronomer/dev
Browse files Browse the repository at this point in the history
globe telecom demo stuff
  • Loading branch information
virajmparekh authored May 5, 2020
2 parents 9619fe6 + e8b7fee commit a96bc22
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 0 deletions.
23 changes: 23 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@ pipeline:
when:
event: push
branch: [ master, release-* ]

build:
image: astronomerio/ap-build:0.0.7
commands:
- docker build -t registry.democluster.astronomer-trials.com/meteoric-cosmic-6397/airflow:ci-${DRONE_BUILD_NUMBER} .
volumes:
- /var/run/docker.sock:/var/run/docker.sock
when:
event: push
branch: [ dev, release-* ]

push:
image: astronomerio/ap-build:0.0.7
commands:
- echo $${ASTRO_API_KEY}
- docker login registry.democluster.astronomer-trials.com -u _ -p $${ASTRO_API_KEY}
- docker push registry.democluster.astronomer-trials.com/meteoric-cosmic-6397/airflow:ci-${DRONE_BUILD_NUMBER}
secrets: [ astro_api_key ]
volumes:
- /var/run/docker.sock:/var/run/docker.sock
when:
event: push
branch: [ dev, release-* ]
88 changes: 88 additions & 0 deletions dags/databricks_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksRunNowOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.operators import DummyOperator
from airflow.operators import PostgresOperator
from datetime import timedelta, datetime

# default arguments
args = {
'owner': 'Airflow',
'email': ['viraj@astronomer.io'],
'email_on_failure' : True,
'depends_on_past': False,
'databricks_conn_id': 'adb_workspace'
}

S3_CONN_ID='astro-s3-workshop'
BUCKET='astro-workshop-bucket'


# Job data
job_info=[{
'job_id':'spark_job_one',
'config': {
"notebook_params": {
'inPath': '/teams/team_one'
}
},
'query': "SELECT * FROM table_one"
},
{
'job_id':'spark_job_two',
'config': {
"notebook_params": {
'inPath': '/teams/team_two'
}
},
'query': "SELECT * FROM table_two"
},
{
'job_id':'spark_job_three',
'config': {
"notebook_params": {
'inPath': '/teams/team_three'
}
},
'query': "SELECT * FROM table_three"
},
{
'job_id':'spark_job_four',
'config': {
"notebook_params": {
'inPath': '/teams/team_three'
}
},
'query': "SELECT * FROM table_four"
}
]
with DAG(dag_id='adb_pipeline',
default_args=args,
start_date=datetime(2019, 1, 1),
schedule_interval='30 4 * * *',
catchup=False) as dag:

t1 = DummyOperator(task_id='kick_off_dag')

t2 = S3KeySensor(
task_id='check_for_file',
bucket_key='globetelecom/copy_*',
poke_interval=45,
timeout=600,
wildcard_match=True,
bucket_name=BUCKET,
aws_conn_id=S3_CONN_ID)

for job in job_info:
spark = DatabricksRunNowOperator(
task_id=job['job_id'],
job_id=job['job_id'],
json=job['config'])

query = PostgresOperator(
task_id='post_{0}_query'.format(job['job_id']),
sql=job['query'],
postgres_conn_id='prod_postgres'
)
t1 >> t2 >> spark >> query
76 changes: 76 additions & 0 deletions dags/s3_upload_copy_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_copy_object_operator import S3CopyObjectOperator
from airflow.hooks import S3Hook
from datetime import datetime, timedelta
import os


S3_CONN_ID='astro-s3-workshop'
BUCKET='astro-workshop-bucket'

name='viraj' #swap your name here


def on_failure_function(**kwargs):
"""
Insert code for failure logic here. This can be a notification function or anything else of the sort.
Note that there are a few ways of doing this.
"""

def upload_to_s3(file_name):

# Instanstiate
s3_hook=S3Hook(aws_conn_id=S3_CONN_ID)

# Create file
sample_file = "{0}_file_{1}.txt".format(name, file_name) #swap your name here
example_file = open(sample_file, "w+")
example_file.write("Putting some data in for task {0}".format(file_name))
example_file.close()

s3_hook.load_file(sample_file, 'globetelecom/{0}'.format(sample_file), bucket_name=BUCKET, replace=True)

# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG('s3_upload_copy',
start_date=datetime(2019, 1, 1),
max_active_runs=1,
schedule_interval='0 12 8-14,22-28 * 6', # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

t0 = DummyOperator(task_id='start')

for i in range(0,10): # generates 10 tasks
generate_files=PythonOperator(
task_id='generate_file_{0}_{1}'.format(name, i), # note the task id is dynamic
python_callable=upload_to_s3,
on_failure_callback=on_failure_function,
op_kwargs= {'file_name': i}
)

copy_files = S3CopyObjectOperator(
task_id='copy_{0}_file_{1}'.format(name,i),
source_bucket_key='globetelecom/{0}_file_{1}.txt'.format(name, i),
dest_bucket_key='globetelecom/copy_{0}_file_{1}.txt'.format(name, i),
source_bucket_name=BUCKET,
dest_bucket_name=BUCKET,
aws_conn_id=S3_CONN_ID
)

t0 >> generate_files >> copy_files # Make sure this is indented inside the scope of the loop
85 changes: 85 additions & 0 deletions dags/s3_upload_copy_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_copy_object_operator import S3CopyObjectOperator
from airflow.contrib.operators.s3_delete_objects_operator import S3DeleteObjectsOperator
from airflow.hooks import S3Hook
from datetime import datetime, timedelta
import os


S3_CONN_ID='astro-s3-workshop'
BUCKET='astro-workshop-bucket'

name='viraj' #swap your name here


def upload_to_s3(file_name):

# Instanstiaute
s3_hook=S3Hook(aws_conn_id=S3_CONN_ID)

# Create file
sample_file = "{0}_file_{1}.txt".format(name, file_name) #swap your name here
example_file = open(sample_file, "w+")
example_file.write("Putting some data in for task {0}".format(file_name))
example_file.close()

s3_hook.load_file(sample_file, 'globetelecom/{0}'.format(sample_file), bucket_name=BUCKET, replace=True)




# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Resources
test_config2 = {"KubernetesExecutor":
{"request_memory": "400Mi",
"limit_memory": "400Mi",
"request_cpu": "400m",
"limit_cpu": "400m"}}


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG('s3_upload_copy_delete',
start_date=datetime(2019, 1, 1),
max_active_runs=1,
schedule_interval='0 12 8-14,22-28 * 6', # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

t0 = DummyOperator(task_id='start')

for i in range(0,10): # generates 10 tasks
generate_files=PythonOperator(
task_id='generate_file_{0}_{1}'.format(name, i), # note the task id is dynamic
python_callable=upload_to_s3,
op_kwargs= {'file_name': i}
)

copy_files = S3CopyObjectOperator(
task_id='copy_{0}_file_{1}'.format(name,i),
source_bucket_key='globetelecom/{0}_file_{1}_testfile_exist.txt'.format(name, i),
dest_bucket_key='globetelecom/copy_{0}_file_{1}.txt'.format(name, i),
source_bucket_name=BUCKET,
dest_bucket_name=BUCKET,
aws_conn_id=S3_CONN_ID
)

delete_files = S3DeleteObjectsOperator(
task_id='delete_{0}_file_{1}'.format(name,i),
keys='globetelecom_copy/{0}_file_{1}'.format(name,i),
bucket=BUCKET,
aws_conn_id=S3_CONN_ID
)

t0 >> generate_files >> copy_files >> delete_files # Make sure this is indented inside the scope of the loop
61 changes: 61 additions & 0 deletions dags/s3_upload_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from airflow.hooks import S3Hook
from datetime import datetime, timedelta
import os


S3_CONN_ID='astro-s3-workshop'
BUCKET='astro-workshop-bucket'

name='viraj' #swap your name here


def upload_to_s3(file_name):

# Instanstiaute
s3_hook=S3Hook(aws_conn_id=S3_CONN_ID)

# Create file
sample_file = "{0}_file_{1}.txt".format(name, file_name) #swap your name here
example_file = open(sample_file, "w+")
example_file.write("Putting some data in for task {0}".format(file_name))
example_file.close()

s3_hook.load_file(sample_file, 'globetelecom/{0}'.format(sample_file), bucket_name=BUCKET, replace=True)




# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG('s3_upload',
start_date=datetime(2019, 1, 1),
max_active_runs=1,
schedule_interval='0 12 8-14,22-28 * 6', # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

t0 = DummyOperator(task_id='start')

for i in range(0,5): # generates 10 tasks
generate_files=PythonOperator(
task_id='generate_file_{0}_{1}'.format(name, i), # task id is generated dynamically
python_callable=upload_to_s3,
op_kwargs= {'file_name': i}
)

t0 >> generate_files

0 comments on commit a96bc22

Please sign in to comment.