-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add support for async callables in PythonOperator #60268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9124c9a to
cd4ec26
Compare
ebf2873 to
0b341e6
Compare
…epends on modified comms supervisor which cannot be backported to older Airflow versions Add support for async callables in PythonOperator (apache#59087) * refactor: Implemented BaseAsyncOperator in task-sdk * refactor: Now PythonOperator extends BaseAsyncOperator * refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions --------- Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com> (cherry picked from commit 9cab6fb)
…nstead of NotImplementedError
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there, once newsfragment is added we are good
And the newfragment ;-) |
|
@kaxil If newsfragment is ok for you then I think it's finally ready to merge. |
|
Static check is failing, worth looking at https://github.com/apache/airflow/blob/main/airflow-core/newsfragments/54505.significant.rst or other examples |
|
Since rebase of PR branch I now get following error, which is strange as the 'use next version' is defined in toml file: |
|
@dabla I think that was because the versions were bumped for the release yesterday: #60437 So you'd need to update the following line: airflow/providers/standard/pyproject.toml Line 77 in 33cc0ac
to add |
Yes indeed, cause I knew I added it, but due to rebase it was gone again as versions where indeed bumped. Re-added it now so hopefully we will be fine. |
|
@kaxil last build was fine, I suppose we can merge now? |
providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
Show resolved
Hide resolved
This PR is related to the discussion I started on the [devlist](https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p) and which allows you to natively execute async code on PythonOperators. There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3 Below an example which show you how it can be used with async hooks: ``` @task(show_return_value_in_logs=False) async def load_xml_files(files): import asyncio from io import BytesIO from more_itertools import chunked from os import cpu_count from tenacity import retry, stop_after_attempt, wait_fixed from airflow.providers.sftp.hooks.sftp import SFTPClientPool print("number of files:", len(files)) async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool: @Retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) async def download_file(file): async with pool.get_sftp_client() as sftp: print("downloading:", file) buffer = BytesIO() async with sftp.open(file, encoding=xml_encoding) as remote_file: data = await remote_file.read() buffer.write(data.encode(xml_encoding)) buffer.seek(0) return buffer for batch in chunked(files, cpu_count() * 2): tasks = [asyncio.create_task(download_file(f)) for f in batch] # Wait for this batch to finish before starting the next for task in asyncio.as_completed(tasks): result = await task # Do something with result or accumulate it and return it as an XCom ``` This PR will fix additional remarks made by @kaxil on the original [PR](apache#59087) which has been reverted.
This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.
There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3
Below an example which show you how it can be used with async hooks:
This PR will fix additional remarks made by @kaxil on the original PR which has been reverted.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.