Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@

from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook
from airflow.providers.google.common.hooks.base_google import (
PROVIDE_PROJECT_ID,
GoogleBaseAsyncHook,
GoogleBaseHook,
)

if TYPE_CHECKING:
from google.api_core import operation
Expand Down Expand Up @@ -159,7 +163,7 @@ def list_jobs(
return list(itertools.islice(jobs, limit))


class CloudRunAsyncHook(GoogleBaseHook):
class CloudRunAsyncHook(GoogleBaseAsyncHook):
"""
Async hook for the Google Cloud Run service.

Expand All @@ -174,6 +178,8 @@ class CloudRunAsyncHook(GoogleBaseHook):
account from the list granting this role to the originating account.
"""

sync_hook_class = GoogleBaseHook

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
Expand All @@ -183,16 +189,16 @@ def __init__(
self._client: JobsAsyncClient | None = None
super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs)

def get_conn(self):
async def get_conn(self):
if self._client is None:
self._client = JobsAsyncClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
sync_hook = await self.get_sync_hook()
self._client = JobsAsyncClient(credentials=sync_hook.get_credentials(), client_info=CLIENT_INFO)

return self._client

async def get_operation(self, operation_name: str) -> operations_pb2.Operation:
return await self.get_conn().get_operation(
operations_pb2.GetOperationRequest(name=operation_name), timeout=120
)
conn = await self.get_conn()
return await conn.get_operation(operations_pb2.GetOperationRequest(name=operation_name), timeout=120)


class CloudRunServiceHook(GoogleBaseHook):
Expand Down