-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add option to wait for completion on the EmrCreateJobFlowOperator #28827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0da5f4d
8d238f2
c09b56a
e0a6b93
bc1cc72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -523,6 +523,12 @@ class EmrCreateJobFlowOperator(BaseOperator): | |||||
| :param job_flow_overrides: boto3 style arguments or reference to an arguments file | ||||||
| (must be '.json') to override specific ``emr_conn_id`` extra parameters. (templated) | ||||||
| :param region_name: Region named passed to EmrHook | ||||||
| :param wait_for_completion: Whether to finish task immediately after creation (False) or wait for jobflow | ||||||
| completion (True) | ||||||
| :param waiter_countdown: Max. seconds to wait for jobflow completion (only in combination with | ||||||
| wait_for_completion=True, None = no limit) | ||||||
| :param waiter_check_interval_seconds: Number of seconds between polling the jobflow state. Defaults to 60 | ||||||
| seconds. | ||||||
| """ | ||||||
|
|
||||||
| template_fields: Sequence[str] = ("job_flow_overrides",) | ||||||
|
|
@@ -538,42 +544,76 @@ def __init__( | |||||
| emr_conn_id: str | None = "emr_default", | ||||||
| job_flow_overrides: str | dict[str, Any] | None = None, | ||||||
| region_name: str | None = None, | ||||||
| wait_for_completion: bool = False, | ||||||
| waiter_countdown: int | None = None, | ||||||
| waiter_check_interval_seconds: int = 60, | ||||||
| **kwargs, | ||||||
| ): | ||||||
| super().__init__(**kwargs) | ||||||
| self.aws_conn_id = aws_conn_id | ||||||
| self.emr_conn_id = emr_conn_id | ||||||
| self.job_flow_overrides = job_flow_overrides or {} | ||||||
| self.region_name = region_name | ||||||
| self.wait_for_completion = wait_for_completion | ||||||
| self.waiter_countdown = waiter_countdown | ||||||
| self.waiter_check_interval_seconds = waiter_check_interval_seconds | ||||||
|
|
||||||
| self._job_flow_id: str | None = None | ||||||
|
|
||||||
| def execute(self, context: Context) -> str: | ||||||
| emr = EmrHook( | ||||||
| @cached_property | ||||||
| def _emr_hook(self) -> EmrHook: | ||||||
| """Create and return an EmrHook.""" | ||||||
| return EmrHook( | ||||||
| aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name | ||||||
| ) | ||||||
|
|
||||||
| def execute(self, context: Context) -> str | None: | ||||||
| self.log.info( | ||||||
| "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id | ||||||
| "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id | ||||||
| ) | ||||||
| if isinstance(self.job_flow_overrides, str): | ||||||
| job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides) | ||||||
| self.job_flow_overrides = job_flow_overrides | ||||||
| else: | ||||||
| job_flow_overrides = self.job_flow_overrides | ||||||
| response = emr.create_job_flow(job_flow_overrides) | ||||||
| response = self._emr_hook.create_job_flow(job_flow_overrides) | ||||||
|
|
||||||
| if not response["ResponseMetadata"]["HTTPStatusCode"] == 200: | ||||||
| raise AirflowException(f"JobFlow creation failed: {response}") | ||||||
| raise AirflowException(f"Job flow creation failed: {response}") | ||||||
| else: | ||||||
| job_flow_id = response["JobFlowId"] | ||||||
| self.log.info("JobFlow with id %s created", job_flow_id) | ||||||
| self._job_flow_id = response["JobFlowId"] | ||||||
| self.log.info("Job flow with id %s created", self._job_flow_id) | ||||||
| EmrClusterLink.persist( | ||||||
| context=context, | ||||||
| operator=self, | ||||||
| region_name=emr.conn_region_name, | ||||||
| aws_partition=emr.conn_partition, | ||||||
| job_flow_id=job_flow_id, | ||||||
| region_name=self._emr_hook.conn_region_name, | ||||||
| aws_partition=self._emr_hook.conn_partition, | ||||||
| job_flow_id=self._job_flow_id, | ||||||
| ) | ||||||
| return job_flow_id | ||||||
|
|
||||||
| if self.wait_for_completion: | ||||||
| # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state. | ||||||
| # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters | ||||||
| waiter( | ||||||
| get_state_callable=self._emr_hook.get_conn().describe_cluster, | ||||||
|
Comment on lines
+597
to
+598
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Glad to see someone using this already to create new customer waiters! 🤩
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, this isn't the waiter setup I thought it was originally (thanks @ferruzzi for pointing that out!). You can find details on the new custom waiters here. Though I'm actually happy to merge this PR with the waiter you used, and then move all of the EMR waiters to the new waiter system in another PR rather than scope creeping this one.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I didn't realize you could actually implement custom waiters that way, hence my comment above: https://github.com/BasPH/airflow/blob/add-emrcreatejobflow-waitforcompletion/airflow/providers/amazon/aws/operators/emr.py#L595-L596. I'll take a look. |
||||||
| get_state_args={"ClusterId": self._job_flow_id}, | ||||||
| parse_response=["Cluster", "Status", "State"], | ||||||
| # Cluster will be in WAITING after finishing if KeepJobFlowAliveWhenNoSteps is True | ||||||
| desired_state={"WAITING", "TERMINATED"}, | ||||||
| failure_states={"TERMINATED_WITH_ERRORS"}, | ||||||
| object_type="job flow", | ||||||
| action="finished", | ||||||
| countdown=self.waiter_countdown, | ||||||
| check_interval_seconds=self.waiter_check_interval_seconds, | ||||||
| ) | ||||||
|
|
||||||
| return self._job_flow_id | ||||||
|
|
||||||
| def on_kill(self) -> None: | ||||||
| """Terminate job flow.""" | ||||||
| if self._job_flow_id: | ||||||
| self.log.info("Terminating job flow %s", self._job_flow_id) | ||||||
| self._emr_hook.terminate_job_flow(self._job_flow_id) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @BasPH, thoughts on this suggested change? Otherwise the PR looks good |
||||||
|
|
||||||
|
|
||||||
| class EmrModifyClusterOperator(BaseOperator): | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally try to avoid functions in hooks which just wrap boto3 api. You can call the boto3 api directly from the operator