-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add support for async callables in PythonOperator #59087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Why do we need a separate operator class? Can this not just be a part of PythonOperator? |
|
I think this one is ready for review. |
providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
Outdated
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
Outdated
Show resolved
Hide resolved
|
Very nice and clean code ! |
|
#protm |
| # always be in the return type union | ||
| return resp # type: ignore[return-value] | ||
|
|
||
| return self._get_response() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a sync send() and async asend() are called concurrently (which is now possible with mixed sync/async code), could the response reads interleave? The async version you have keeps the read inside the lock, but the sync version doesn't so curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally it should behave correctly, unless I missed something, as the async version checks both async and sync lock, the sync version only checks the sync lock. We are now running the async PythonOperator since right after the Summit and until today haven't experienced any issues with it, I had issues in the beginning and this is how I came up with this solution as indeed it could lead to mix-ups very quickly without those locks.
|
|
||
|
|
||
| class PythonOperator(BaseOperator): | ||
| class PythonOperator(BaseAsyncOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should BranchPythonOperator and ShortCircuitOperator which inherit this Operator support or reject async callables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The do automatically as they inherit from PythonOperator which is an async operator by default. The BaseBranchOperator operator should not be async.
providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
Show resolved
Hide resolved
| def execute(self, context): | ||
| """Run `aexecute()` inside an event loop.""" | ||
| with event_loop() as loop: | ||
| if self.execution_timeout: | ||
| return loop.run_until_complete( | ||
| asyncio.wait_for( | ||
| self.aexecute(context), | ||
| timeout=self.execution_timeout.total_seconds(), | ||
| ) | ||
| ) | ||
| return loop.run_until_complete(self.aexecute(context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a test for this case specifically tbh so that a regression won't make async tasks run indefinitely.
|
@dabla Apologies for a late review, but could you take a look at them please |
…)" This reverts commit 9cab6fb.
* refactor: Implemented BaseAsyncOperator in task-sdk * refactor: Now PythonOperator extends BaseAsyncOperator * refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions --------- Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
…)" (apache#60266) This reverts commit 9cab6fb.
…epends on modified comms supervisor which cannot be backported to older Airflow versions Add support for async callables in PythonOperator (apache#59087) * refactor: Implemented BaseAsyncOperator in task-sdk * refactor: Now PythonOperator extends BaseAsyncOperator * refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions --------- Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com> (cherry picked from commit 9cab6fb)
* refactor: Implemented BaseAsyncOperator in task-sdk * refactor: Now PythonOperator extends BaseAsyncOperator * refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions --------- Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
…)" (apache#60266) This reverts commit 9cab6fb.
This PR is related to the discussion I started on the [devlist](https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p) and which allows you to natively execute async code on PythonOperators. There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3 Below an example which show you how it can be used with async hooks: ``` @task(show_return_value_in_logs=False) async def load_xml_files(files): import asyncio from io import BytesIO from more_itertools import chunked from os import cpu_count from tenacity import retry, stop_after_attempt, wait_fixed from airflow.providers.sftp.hooks.sftp import SFTPClientPool print("number of files:", len(files)) async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool: @Retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) async def download_file(file): async with pool.get_sftp_client() as sftp: print("downloading:", file) buffer = BytesIO() async with sftp.open(file, encoding=xml_encoding) as remote_file: data = await remote_file.read() buffer.write(data.encode(xml_encoding)) buffer.seek(0) return buffer for batch in chunked(files, cpu_count() * 2): tasks = [asyncio.create_task(download_file(f)) for f in batch] # Wait for this batch to finish before starting the next for task in asyncio.as_completed(tasks): result = await task # Do something with result or accumulate it and return it as an XCom ``` This PR will fix additional remarks made by @kaxil on the original [PR](#59087) which has been reverted.
This PR is related to the discussion I started on the [devlist](https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p) and which allows you to natively execute async code on PythonOperators. There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3 Below an example which show you how it can be used with async hooks: ``` @task(show_return_value_in_logs=False) async def load_xml_files(files): import asyncio from io import BytesIO from more_itertools import chunked from os import cpu_count from tenacity import retry, stop_after_attempt, wait_fixed from airflow.providers.sftp.hooks.sftp import SFTPClientPool print("number of files:", len(files)) async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool: @Retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) async def download_file(file): async with pool.get_sftp_client() as sftp: print("downloading:", file) buffer = BytesIO() async with sftp.open(file, encoding=xml_encoding) as remote_file: data = await remote_file.read() buffer.write(data.encode(xml_encoding)) buffer.seek(0) return buffer for batch in chunked(files, cpu_count() * 2): tasks = [asyncio.create_task(download_file(f)) for f in batch] # Wait for this batch to finish before starting the next for task in asyncio.as_completed(tasks): result = await task # Do something with result or accumulate it and return it as an XCom ``` This PR will fix additional remarks made by @kaxil on the original [PR](apache#59087) which has been reverted.
This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.
There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3
Below an example which show you how it can be used with async hooks:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.