Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Dec 5, 2025

This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@jason810496 jason810496 self-requested a review December 5, 2025 11:29
@Lee-W Lee-W self-requested a review December 5, 2025 15:04
@uranusjr
Copy link
Member

uranusjr commented Dec 8, 2025

Why do we need a separate operator class? Can this not just be a part of PythonOperator?

@dabla
Copy link
Contributor Author

dabla commented Jan 7, 2026

I think this one is ready for review.

@potiuk
Copy link
Member

potiuk commented Jan 8, 2026

Very nice and clean code !

@dabla dabla merged commit 9cab6fb into apache:main Jan 8, 2026
127 checks passed
@jason810496
Copy link
Member

#protm

# always be in the return type union
return resp # type: ignore[return-value]

return self._get_response()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a sync send() and async asend() are called concurrently (which is now possible with mixed sync/async code), could the response reads interleave? The async version you have keeps the read inside the lock, but the sync version doesn't so curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally it should behave correctly, unless I missed something, as the async version checks both async and sync lock, the sync version only checks the sync lock. We are now running the async PythonOperator since right after the Summit and until today haven't experienced any issues with it, I had issues in the beginning and this is how I came up with this solution as indeed it could lead to mix-ups very quickly without those locks.



class PythonOperator(BaseOperator):
class PythonOperator(BaseAsyncOperator):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should BranchPythonOperator and ShortCircuitOperator which inherit this Operator support or reject async callables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The do automatically as they inherit from PythonOperator which is an async operator by default. The BaseBranchOperator operator should not be async.

Comment on lines +1713 to +1723
def execute(self, context):
"""Run `aexecute()` inside an event loop."""
with event_loop() as loop:
if self.execution_timeout:
return loop.run_until_complete(
asyncio.wait_for(
self.aexecute(context),
timeout=self.execution_timeout.total_seconds(),
)
)
return loop.run_until_complete(self.aexecute(context))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test for this case specifically tbh so that a regression won't make async tasks run indefinitely.

@kaxil
Copy link
Member

kaxil commented Jan 8, 2026

@dabla Apologies for a late review, but could you take a look at them please

dabla added a commit to dabla/airflow that referenced this pull request Jan 8, 2026
kaxil pushed a commit that referenced this pull request Jan 8, 2026
chirodip98 pushed a commit to chirodip98/airflow-contrib that referenced this pull request Jan 9, 2026
* refactor: Implemented BaseAsyncOperator in task-sdk

* refactor: Now PythonOperator extends BaseAsyncOperator

* refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions

---------

Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
chirodip98 pushed a commit to chirodip98/airflow-contrib that referenced this pull request Jan 9, 2026
dabla added a commit to dabla/airflow that referenced this pull request Jan 9, 2026
…epends on modified comms supervisor which cannot be backported to older Airflow versions

Add support for async callables in PythonOperator (apache#59087)

* refactor: Implemented BaseAsyncOperator in task-sdk

* refactor: Now PythonOperator extends BaseAsyncOperator

* refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions

---------

Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
(cherry picked from commit 9cab6fb)
stegololz pushed a commit to stegololz/airflow that referenced this pull request Jan 9, 2026
* refactor: Implemented BaseAsyncOperator in task-sdk

* refactor: Now PythonOperator extends BaseAsyncOperator

* refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions

---------

Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
stegololz pushed a commit to stegololz/airflow that referenced this pull request Jan 9, 2026
kaxil pushed a commit that referenced this pull request Jan 15, 2026
This PR is related to the discussion I started on the [devlist](https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p) and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

```
@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @Retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom
```

This PR will fix additional remarks made by @kaxil on the original [PR](#59087) which has been reverted.
jason810496 pushed a commit to jason810496/airflow that referenced this pull request Jan 22, 2026
This PR is related to the discussion I started on the [devlist](https://lists.apache.org/thread/ztnfsqolow4v1zsv4pkpnxc1fk0hbf2p) and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

```
@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @Retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom
```

This PR will fix additional remarks made by @kaxil on the original [PR](apache#59087) which has been reverted.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants