Skip to content

Commit 46c61ed

Browse files
authored
EMR Sample DAG and Docs Update (#22189)
1 parent 152539f commit 46c61ed

File tree

5 files changed

+174
-100
lines changed

5 files changed

+174
-100
lines changed

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18-
"""
19-
This is an example dag for a AWS EMR Pipeline with auto steps.
20-
"""
21-
from datetime import datetime, timedelta
18+
import os
19+
from datetime import datetime
2220

2321
from airflow import DAG
2422
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
2523
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
2624

25+
JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
26+
SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
27+
2728
# [START howto_operator_emr_automatic_steps_config]
2829
SPARK_STEPS = [
2930
{
@@ -44,38 +45,40 @@
4445
'InstanceGroups': [
4546
{
4647
'Name': 'Primary node',
47-
'Market': 'SPOT',
48+
'Market': 'ON_DEMAND',
4849
'InstanceRole': 'MASTER',
49-
'InstanceType': 'm1.medium',
50+
'InstanceType': 'm5.xlarge',
5051
'InstanceCount': 1,
51-
}
52+
},
5253
],
5354
'KeepJobFlowAliveWhenNoSteps': False,
5455
'TerminationProtected': False,
5556
},
5657
'Steps': SPARK_STEPS,
57-
'JobFlowRole': 'EMR_EC2_DefaultRole',
58-
'ServiceRole': 'EMR_DefaultRole',
58+
'JobFlowRole': JOB_FLOW_ROLE,
59+
'ServiceRole': SERVICE_ROLE,
5960
}
6061
# [END howto_operator_emr_automatic_steps_config]
6162

63+
6264
with DAG(
63-
dag_id='emr_job_flow_automatic_steps_dag',
64-
dagrun_timeout=timedelta(hours=2),
65+
dag_id='example_emr_job_flow_automatic_steps',
66+
schedule_interval=None,
6567
start_date=datetime(2021, 1, 1),
66-
schedule_interval='0 3 * * *',
67-
catchup=False,
6868
tags=['example'],
69+
catchup=False,
6970
) as dag:
7071

71-
# [START howto_operator_emr_automatic_steps_tasks]
72+
# [START howto_operator_emr_create_job_flow]
7273
job_flow_creator = EmrCreateJobFlowOperator(
7374
task_id='create_job_flow',
7475
job_flow_overrides=JOB_FLOW_OVERRIDES,
7576
)
77+
# [END howto_operator_emr_create_job_flow]
7678

77-
job_sensor = EmrJobFlowSensor(task_id='check_job_flow', job_flow_id=job_flow_creator.output)
78-
# [END howto_operator_emr_automatic_steps_tasks]
79-
80-
# Task dependency created via `XComArgs`:
81-
# job_flow_creator >> job_sensor
79+
# [START howto_sensor_emr_job_flow_sensor]
80+
job_sensor = EmrJobFlowSensor(
81+
task_id='check_job_flow',
82+
job_flow_id=job_flow_creator.output,
83+
)
84+
# [END howto_sensor_emr_job_flow_sensor]

airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,21 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18-
"""
19-
This is an example dag for a AWS EMR Pipeline.
20-
21-
Starting by creating a cluster, adding steps/operations, checking steps and finally when finished
22-
terminating the cluster.
23-
"""
24-
from datetime import datetime, timedelta
18+
import os
19+
from datetime import datetime
2520

2621
from airflow import DAG
22+
from airflow.models.baseoperator import chain
2723
from airflow.providers.amazon.aws.operators.emr import (
2824
EmrAddStepsOperator,
2925
EmrCreateJobFlowOperator,
3026
EmrTerminateJobFlowOperator,
3127
)
3228
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
3329

30+
JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
31+
SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
32+
3433
SPARK_STEPS = [
3534
{
3635
'Name': 'calculate_pi',
@@ -50,54 +49,58 @@
5049
'InstanceGroups': [
5150
{
5251
'Name': 'Primary node',
53-
'Market': 'SPOT',
52+
'Market': 'ON_DEMAND',
5453
'InstanceRole': 'MASTER',
55-
'InstanceType': 'm1.medium',
54+
'InstanceType': 'm5.xlarge',
5655
'InstanceCount': 1,
57-
}
56+
},
5857
],
59-
'KeepJobFlowAliveWhenNoSteps': True,
58+
'KeepJobFlowAliveWhenNoSteps': False,
6059
'TerminationProtected': False,
6160
},
62-
'JobFlowRole': 'EMR_EC2_DefaultRole',
63-
'ServiceRole': 'EMR_DefaultRole',
61+
'JobFlowRole': JOB_FLOW_ROLE,
62+
'ServiceRole': SERVICE_ROLE,
6463
}
6564

65+
6666
with DAG(
67-
dag_id='emr_job_flow_manual_steps_dag',
68-
dagrun_timeout=timedelta(hours=2),
67+
dag_id='example_emr_job_flow_manual_steps',
68+
schedule_interval=None,
6969
start_date=datetime(2021, 1, 1),
70-
schedule_interval='0 3 * * *',
71-
catchup=False,
7270
tags=['example'],
71+
catchup=False,
7372
) as dag:
7473

75-
# [START howto_operator_emr_manual_steps_tasks]
7674
cluster_creator = EmrCreateJobFlowOperator(
7775
task_id='create_job_flow',
7876
job_flow_overrides=JOB_FLOW_OVERRIDES,
7977
)
8078

79+
# [START howto_operator_emr_add_steps]
8180
step_adder = EmrAddStepsOperator(
8281
task_id='add_steps',
8382
job_flow_id=cluster_creator.output,
8483
steps=SPARK_STEPS,
8584
)
85+
# [END howto_operator_emr_add_steps]
8686

87+
# [START howto_sensor_emr_step_sensor]
8788
step_checker = EmrStepSensor(
8889
task_id='watch_step',
8990
job_flow_id=cluster_creator.output,
9091
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
9192
)
93+
# [END howto_sensor_emr_step_sensor]
9294

95+
# [START howto_operator_emr_terminate_job_flow]
9396
cluster_remover = EmrTerminateJobFlowOperator(
94-
task_id='remove_cluster', job_flow_id=cluster_creator.output
97+
task_id='remove_cluster',
98+
job_flow_id=cluster_creator.output,
9599
)
100+
# [END howto_operator_emr_terminate_job_flow]
96101

97-
step_adder >> step_checker >> cluster_remover
98-
# [END howto_operator_emr_manual_steps_tasks]
99-
100-
# Task dependencies created via `XComArgs`:
101-
# cluster_creator >> step_adder
102-
# cluster_creator >> step_checker
103-
# cluster_creator >> cluster_remover
102+
chain(
103+
step_adder,
104+
step_checker,
105+
cluster_remover,
106+
)

airflow/providers/amazon/aws/operators/emr.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ class EmrAddStepsOperator(BaseOperator):
4242
"""
4343
An operator that adds steps to an existing EMR job_flow.
4444
45+
.. seealso::
46+
For more information on how to use this operator, take a look at the guide:
47+
:ref:`howto/operator:EmrAddStepsOperator`
48+
4549
:param job_flow_id: id of the JobFlow to add steps to. (templated)
4650
:param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
4751
job_flow_id. will search for id of JobFlow with matching name in one of the states in
@@ -264,6 +268,10 @@ class EmrCreateJobFlowOperator(BaseOperator):
264268
A dictionary of JobFlow overrides can be passed that override
265269
the config from the connection.
266270
271+
.. seealso::
272+
For more information on how to use this operator, take a look at the guide:
273+
:ref:`howto/operator:EmrCreateJobFlowOperator`
274+
267275
:param aws_conn_id: aws connection to uses
268276
:param emr_conn_id: emr connection to use
269277
:param job_flow_overrides: boto3 style arguments or reference to an arguments file
@@ -320,6 +328,11 @@ def execute(self, context: 'Context') -> str:
320328
class EmrModifyClusterOperator(BaseOperator):
321329
"""
322330
An operator that modifies an existing EMR cluster.
331+
332+
.. seealso::
333+
For more information on how to use this operator, take a look at the guide:
334+
:ref:`howto/operator:EmrModifyClusterOperator`
335+
323336
:param cluster_id: cluster identifier
324337
:param step_concurrency_level: Concurrency of the cluster
325338
:param aws_conn_id: aws connection to uses
@@ -364,6 +377,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
364377
"""
365378
Operator to terminate EMR JobFlows.
366379
380+
.. seealso::
381+
For more information on how to use this operator, take a look at the guide:
382+
:ref:`howto/operator:EmrTerminateJobFlowOperator`
383+
367384
:param job_flow_id: id of the JobFlow to terminate. (templated)
368385
:param aws_conn_id: aws connection to uses
369386
"""

airflow/providers/amazon/aws/sensors/emr.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ class EmrContainerSensor(BaseSensorOperator):
122122
Asks for the state of the job run until it reaches a failure state or success state.
123123
If the job run fails, the task will fail.
124124
125+
.. seealso::
126+
For more information on how to use this sensor, take a look at the guide:
127+
:ref:`howto/sensor:EmrContainerSensor`
128+
125129
:param job_id: job_id to check the state of
126130
:param max_retries: Number of times to poll for query state before
127131
returning the current state, defaults to None
@@ -189,6 +193,10 @@ class EmrJobFlowSensor(EmrBaseSensor):
189193
When target_states is set to ['RUNNING', 'WAITING'] sensor waits
190194
until job flow to be ready (after 'STARTING' and 'BOOTSTRAPPING' states)
191195
196+
.. seealso::
197+
For more information on how to use this sensor, take a look at the guide:
198+
:ref:`howto/sensor:EmrJobFlowSensor`
199+
192200
:param job_flow_id: job_flow_id to check the state of
193201
:param target_states: the target states, sensor waits until
194202
job flow reaches any of these states
@@ -263,6 +271,10 @@ class EmrStepSensor(EmrBaseSensor):
263271
264272
With the default target states, sensor waits step to be completed.
265273
274+
.. seealso::
275+
For more information on how to use this sensor, take a look at the guide:
276+
:ref:`howto/sensor:EmrStepSensor`
277+
266278
:param job_flow_id: job_flow_id which contains the step check the state of
267279
:param step_id: step to check the state of
268280
:param target_states: the target states, sensor waits until

0 commit comments

Comments
 (0)