-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add EMR Serverless Operators and Hooks #25324
Add EMR Serverless Operators and Hooks #25324
Conversation
Co-Author: https://github.com/dacort
…as params. Add test to check timeout exception
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
@ferruzzi @o-nikolas @vincbeck -> WDYT? |
Syed is a new member of our team, we already reviewed his work :) Thanks for reaching out though |
aha :) Good to know. |
Some errors to fix :) |
**kwargs: Any, | ||
) -> None: | ||
self.aws_conn_id = aws_conn_id | ||
self.emr_conn_id = emr_conn_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could easily be missing it, but where is emr_conn_id
being used in this sensor and other sensors in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It seems emr_conn_id
is not used. You can just remove it
**kwargs: Any, | ||
) -> None: | ||
self.aws_conn_id = aws_conn_id | ||
self.emr_conn_id = emr_conn_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It seems emr_conn_id
is not used. You can just remove it
|
||
try: | ||
response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id) | ||
except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall question. Do we want or is there any value converting the Exception
to AirflowException
? I can see there is a lot of operators which do not do that. If there is no value, I would rather not doing the conversion just for the sake of having a code cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with Exception
just as a catchall for any type of exception that might be thrown by the boto3 API. I'm not sure if there's an advantage to throwing an AirflowException
instead of a generic Exception
. Only thing I can think of is that it would make it clear that the exception was coming from Airflow, rather than any underlying API calls (i.e. boto3) that might have been made. But that can also be understood from the message associated with the Exception
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think AirflowException
is used too broadly in the code base. IMO AirflowException
, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is orchestrating. It might make more sense to drop the try/except
and bubble up whatever API call errors that might be thrown directly. Up to you though.
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
Outdated
Show resolved
Hide resolved
Closes #20215 |
some errors need fixing. |
- Import cached_property directly functools - Fix indentation in doc string - Define default values in doc string explicitly - Remove unnecessary references to emr_conn_id - Replace print messages with log messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor suggestions but overall LGTM (after fixing the tests of course).
.. _howto/operator:EmrServerlessCreateApplicationOperator: | ||
|
||
Create an EMR Serverless Application | ||
========================== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This underline needs to be at least the length of the text it's underlining. I imagine this might be a Build Docs failure for you.
self.log.info('Starting job on Application: %s', self.application_id) | ||
|
||
app_state = self.hook.conn.get_application(applicationId=self.application_id)['application']['state'] | ||
if app_state not in {'CREATED', 'STARTED'}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if app_state not in {'CREATED', 'STARTED'}: | |
if app_state not in EmrServerlessApplicationSensor.SUCCESS_STATES: |
Small suggestion to use the attrs where you can like you have in other places. Up to you though (assuming it's functionally correct as well).
|
||
try: | ||
response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id) | ||
except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think AirflowException
is used too broadly in the code base. IMO AirflowException
, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is orchestrating. It might make more sense to drop the try/except
and bubble up whatever API call errors that might be thrown directly. Up to you though.
- Fixed minor formatting in emr_serverless.rst
@@ -0,0 +1,97 @@ | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new example dags should be according to AIP-47
Example: #25205
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point but the plan was to do it in two iterations since this PR is already big enough. Once this PR is merged, @syedahsn will work on converting this sample dag to system test. If you strongly disagree, I guess the two can be done in this PR but it will just make it bigger and harder to review
- Minor formatting in emr_serverless.rst
Closes: #20215
Define new operators, as well as necessary hooks and sensors, to run EMR Serverless jobs. System test for EMR Serverless will be coming in a separate PR.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.