Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EMR Serverless Operators and Hooks #25324

Merged
merged 25 commits into from
Aug 5, 2022

Conversation

syedahsn
Copy link
Contributor

@syedahsn syedahsn commented Jul 26, 2022

Closes: #20215

Define new operators, as well as necessary hooks and sensors, to run EMR Serverless jobs. System test for EMR Serverless will be coming in a separate PR.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Jul 26, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@potiuk
Copy link
Member

potiuk commented Jul 28, 2022

@ferruzzi @o-nikolas @vincbeck -> WDYT?

@vincbeck
Copy link
Contributor

Syed is a new member of our team, we already reviewed his work :) Thanks for reaching out though

@potiuk
Copy link
Member

potiuk commented Jul 28, 2022

Syed is a new member of our team, we already reviewed his work :) Thanks for reaching out though

aha :) Good to know.

@potiuk
Copy link
Member

potiuk commented Jul 29, 2022

Some errors to fix :)

airflow/providers/amazon/aws/hooks/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/hooks/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/operators/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
**kwargs: Any,
) -> None:
self.aws_conn_id = aws_conn_id
self.emr_conn_id = emr_conn_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could easily be missing it, but where is emr_conn_id being used in this sensor and other sensors in this file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It seems emr_conn_id is not used. You can just remove it

airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/hooks/emr.py Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
**kwargs: Any,
) -> None:
self.aws_conn_id = aws_conn_id
self.emr_conn_id = emr_conn_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It seems emr_conn_id is not used. You can just remove it

airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved

try:
response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall question. Do we want or is there any value converting the Exception to AirflowException? I can see there is a lot of operators which do not do that. If there is no value, I would rather not doing the conversion just for the sake of having a code cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with Exception just as a catchall for any type of exception that might be thrown by the boto3 API. I'm not sure if there's an advantage to throwing an AirflowException instead of a generic Exception. Only thing I can think of is that it would make it clear that the exception was coming from Airflow, rather than any underlying API calls (i.e. boto3) that might have been made. But that can also be understood from the message associated with the Exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think AirflowException is used too broadly in the code base. IMO AirflowException, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is orchestrating. It might make more sense to drop the try/except and bubble up whatever API call errors that might be thrown directly. Up to you though.

airflow/providers/amazon/aws/sensors/emr.py Outdated Show resolved Hide resolved
@dacort
Copy link
Contributor

dacort commented Jul 29, 2022

Closes #20215

@potiuk
Copy link
Member

potiuk commented Aug 2, 2022

some errors need fixing.

- Import cached_property directly functools
- Fix indentation in doc string
- Define default values in doc string explicitly
- Remove unnecessary references to emr_conn_id
- Replace print messages with log messages
Copy link
Contributor

@josh-fell josh-fell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor suggestions but overall LGTM (after fixing the tests of course).

.. _howto/operator:EmrServerlessCreateApplicationOperator:

Create an EMR Serverless Application
==========================
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This underline needs to be at least the length of the text it's underlining. I imagine this might be a Build Docs failure for you.

self.log.info('Starting job on Application: %s', self.application_id)

app_state = self.hook.conn.get_application(applicationId=self.application_id)['application']['state']
if app_state not in {'CREATED', 'STARTED'}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if app_state not in {'CREATED', 'STARTED'}:
if app_state not in EmrServerlessApplicationSensor.SUCCESS_STATES:

Small suggestion to use the attrs where you can like you have in other places. Up to you though (assuming it's functionally correct as well).


try:
response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think AirflowException is used too broadly in the code base. IMO AirflowException, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is orchestrating. It might make more sense to drop the try/except and bubble up whatever API call errors that might be thrown directly. Up to you though.

 - Fixed minor formatting in emr_serverless.rst
eladkal
eladkal previously requested changes Aug 4, 2022
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new example dags should be according to AIP-47
Example: #25205

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point but the plan was to do it in two iterations since this PR is already big enough. Once this PR is merged, @syedahsn will work on converting this sample dag to system test. If you strongly disagree, I guess the two can be done in this PR but it will just make it bigger and harder to review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

EMR serverless, new operator
7 participants