Skip to content

Conversation

@kgw7401
Copy link
Contributor

@kgw7401 kgw7401 commented Jul 7, 2025

Hi I'm making some feature on dataproc trigger but i got an error related to async when i set deferrable=True
Here is error that i encountered.

Related: #52005

Traceback (most recent call last):

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 923, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1032, in run_trigger
    async for event in trigger.run():

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py", line 187, in run
    job = await self.get_async_hook().get_job(
                ^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py", line 71, in get_async_hook
    return DataprocAsyncHook(
           ^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/dataproc.py", line 1286, in __init__
    super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs)

  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 280, in __init__
    self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/hooks/base.py", line 64, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 481, in get_connection_from_secrets
    conn = TaskSDKConnection.get(conn_id=conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 152, in get
    return _get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 155, in _get_connection
    msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 708, in send
    return async_to_sync(self.asend)(msg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/asgiref/sync.py", line 186, in __call__
    raise RuntimeError(

RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

I think main reason is that even if i use get_async_hook, inside of DataprocAsyncHook inherit GoogleBaseHook that is not for async. so i change the DataprocAsyncHook's parent class into GoogleBaseAsyncHook.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jul 7, 2025
@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 7, 2025

I’m updating the test codes.

@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 13, 2025

@RNHTTR Hi there! Could you pls review my pr when you have time?

Copy link
Contributor

@RNHTTR RNHTTR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple small nits. Also... should there be a test for get_cluster?

@kgw7401 kgw7401 requested a review from RNHTTR July 14, 2025 23:07
@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 16, 2025

@eladkal I got approved. Could you please merge this pr? cause this is related to another issue!

@kyungjunleeme
Copy link
Contributor

@kgw7401 I think that this is nice pr.

but if one morething to be added, it is nice pr more.

in test_dataproc.py

@pytest.fixture
def async_get_operation():
    def func(**kwargs):
        m = mock.MagicMock()
        m.configure_mock(**kwargs)
        f = Future()
        f.set_result(m)
        return f

    return func

it should be removed. because that fixture is not being used in that module.
could you check again?

@potiuk
Copy link
Member

potiuk commented Jul 19, 2025

it should be removed. because that fixture is not being used in that module.
could you check again?

Could be separate PR.

@potiuk
Copy link
Member

potiuk commented Jul 19, 2025

We try to focus on one thing only in one PR - this is super important for cherry-picking/reverting / tracking what was changed when

@potiuk potiuk merged commit 00867ec into apache:main Jul 19, 2025
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants