Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 34 additions & 57 deletions airflow/providers/amazon/aws/example_dags/example_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta
from datetime import datetime
from os import getenv

from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator, S3DeleteObjectsOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor

# [START howto_operator_athena_env_variables]
S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
S3_KEY = getenv("S3_KEY", "key")
ATHENA_TABLE = getenv("ATHENA_TABLE", "test_table")
ATHENA_DATABASE = getenv("ATHENA_DATABASE", "default")
# [END howto_operator_athena_env_variables]
S3_KEY = getenv('S3_KEY', 'athena-demo')
ATHENA_TABLE = getenv('ATHENA_TABLE', 'test_table')
ATHENA_DATABASE = getenv('ATHENA_DATABASE', 'default')

SAMPLE_DATA = """"Alice",20
"Bob",25
Expand All @@ -38,28 +37,12 @@
SAMPLE_FILENAME = 'airflow_sample.csv'


@task(task_id='setup__add_sample_data_to_s3')
def add_sample_data_to_s3():
s3_hook = S3Hook()
s3_hook.load_string(SAMPLE_DATA, f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET, replace=True)


@task(task_id='teardown__remove_sample_data_from_s3')
def remove_sample_data_from_s3():
s3_hook = S3Hook()
if s3_hook.check_for_key(f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET):
s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}')


@task(task_id='query__read_results_from_s3')
@task
def read_results_from_s3(query_execution_id):
s3_hook = S3Hook()
if s3_hook.check_for_key(f'{S3_KEY}/{query_execution_id}.csv', S3_BUCKET):
file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, Key=f'{S3_KEY}/{query_execution_id}.csv')
file_content = file_obj['Body'].read().decode('utf-8')
print(file_content)
else:
print('Could not find QueryExecutionId:', query_execution_id)
file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, Key=f'{S3_KEY}/{query_execution_id}.csv')
file_content = file_obj['Body'].read().decode('utf-8')
print(file_content)


QUERY_CREATE_TABLE = f"""
Expand All @@ -82,66 +65,60 @@ def read_results_from_s3(query_execution_id):
dag_id='example_athena',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
) as dag:
# [START howto_athena_operator_and_sensor]

# Using a task-decorated function to create a CSV file in S3
add_sample_data_to_s3 = add_sample_data_to_s3()
upload_sample_data = S3CreateObjectOperator(
task_id='upload_sample_data',
s3_bucket=S3_BUCKET,
s3_key=f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}',
data=SAMPLE_DATA,
replace=True,
)

create_table = AthenaOperator(
task_id='setup__create_table',
task_id='create_table',
query=QUERY_CREATE_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
)

# [START howto_athena_operator]
read_table = AthenaOperator(
task_id='query__read_table',
task_id='read_table',
query=QUERY_READ_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
)
# [END howto_athena_operator]

get_read_state = AthenaSensor(
task_id='query__get_read_state',
# [START howto_athena_sensor]
await_query = AthenaSensor(
task_id='await_query',
query_execution_id=read_table.output,
max_retries=None,
sleep_time=10,
)

# Using a task-decorated function to read the results from S3
read_results_from_s3 = read_results_from_s3(read_table.output)
# [END howto_athena_sensor]

drop_table = AthenaOperator(
task_id='teardown__drop_table',
task_id='drop_table',
query=QUERY_DROP_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
)

# Using a task-decorated function to delete the S3 file we created earlier
remove_sample_data_from_s3 = remove_sample_data_from_s3()
remove_s3_files = S3DeleteObjectsOperator(
task_id='remove_s3_files',
bucket=S3_BUCKET,
prefix=S3_KEY,
)

(
add_sample_data_to_s3 # type: ignore
upload_sample_data
>> create_table
>> read_table
>> get_read_state
>> read_results_from_s3
>> await_query
>> read_results_from_s3(read_table.output)
>> drop_table
>> remove_sample_data_from_s3
>> remove_s3_files
)
# [END howto_athena_operator_and_sensor]

# Task dependencies created via `XComArgs`:
# read_table >> get_read_state
# read_table >> read_results_from_s3
5 changes: 5 additions & 0 deletions airflow/providers/amazon/aws/sensors/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class AthenaSensor(BaseSensorOperator):
Asks for the state of the Query until it reaches a failure state or success state.
If the query fails, the task will fail.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AthenaSensor`


:param query_execution_id: query_execution_id to check the state of
:param max_retries: Number of times to poll for query state before
returning the current state, defaults to None
Expand Down
66 changes: 41 additions & 25 deletions docs/apache-airflow-providers-amazon/operators/athena.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,58 @@
under the License.


.. _howto/operator:AthenaOperator:

Amazon Athena Operator
======================
Amazon Athena Operators
=======================

.. contents::
:depth: 1
:local:
`Amazon Athena <https://aws.amazon.com/athena/>`__ is an interactive query service
that makes it easy to analyze data in Amazon Simple Storage Service (S3) using
standard SQL. Athena is serverless, so there is no infrastructure to setup or
manage, and you pay only for the queries you run. To get started, simply point
to your data in S3, define the schema, and start querying using standard SQL.

Prerequisite Tasks
------------------
^^^^^^^^^^^^^^^^^^

.. include:: _partials/prerequisite_tasks.rst

Using Operator
--------------
Use the
:class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
to run a query in Amazon Athena. To get started with Amazon Athena please visit
`aws.amazon.com/athena <https://aws.amazon.com/athena>`_

.. _howto/operator:AthenaOperator:

Athena Operator
^^^^^^^^^^^^^^^

In the following example, we create an Athena table and run a query based upon a CSV file
created in an S3 bucket and populated with SAMPLE_DATA. The example waits for the query
to complete and then drops the created table and deletes the sample CSV file in the S3
bucket.
Use the :class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
to run a query in Amazon Athena.


In the following example, we query an existing Athena table and send the results to
an existing Amazon S3 bucket. For more examples of how to use this operator, please
see the `Sample DAG <https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_athena.py>`__.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_athena.py
:language: python
:start-after: [START howto_athena_operator_and_sensor]
:end-before: [END howto_athena_operator_and_sensor]
:start-after: [START howto_athena_operator]
:dedent: 4
:end-before: [END howto_athena_operator]

.. _howto/operator:AthenaSensor:

Athena Sensor
^^^^^^^^^^^^^

Use the :class:`~airflow.providers.amazon.aws.sensors.athena.AthenaSensor`
to wait for the results of a query in Amazon Athena.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_athena.py
:language: python
:start-after: [START howto_athena_sensor]
:dedent: 4
:end-before: [END howto_athena_sensor]


More information
----------------
Reference
^^^^^^^^^

For further information, look at the documentation of :meth:`~Athena.Client.start_query_execution` method
in `boto3`_.
For further information, look at:

.. _boto3: https://pypi.org/project/boto3/
* `Boto3 Library Documentation for Athena <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html>`__