Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
from azure.core.credentials_async import AsyncTokenCredential
from azure.storage.blob._models import BlobProperties
from azure.storage.blob.aio._list_blobs_helper import BlobPrefix

Expand Down Expand Up @@ -614,8 +615,9 @@ async def get_async_conn(self) -> AsyncBlobServiceClient:
tenant = self._get_field(extra, "tenant_id")
if tenant:
# use Active Directory auth
app_id = conn.login
app_secret = conn.password
app_id = conn.login or ""
app_secret = conn.password or ""

token_credential = AsyncClientSecretCredential(
tenant, app_id, app_secret, **client_secret_auth_config
)
Expand Down Expand Up @@ -652,6 +654,7 @@ async def get_async_conn(self) -> AsyncBlobServiceClient:
return self.blob_service_client

# Fall back to old auth (password) or use managed identity if not provided.
credential: str | AsyncTokenCredential | None
credential = conn.password
if not credential:
# Check for account_key in extra fields before falling back to DefaultAzureCredential
Expand Down Expand Up @@ -681,6 +684,9 @@ def _get_blob_client(self, container_name: str, blob_name: str) -> AsyncBlobClie
:param container_name: the name of the blob container
:param blob_name: the name of the blob. This needs not be existing
"""
if self.blob_service_client is None:
raise AirflowException("BlobServiceClient is not initialized")

return self.blob_service_client.get_blob_client(container=container_name, blob=blob_name)

async def check_for_blob_async(self, container_name: str, blob_name: str, **kwargs: Any) -> bool:
Expand All @@ -704,6 +710,9 @@ def _get_container_client(self, container_name: str) -> AsyncContainerClient: #

:param container_name: the name of the container
"""
if self.blob_service_client is None:
raise AirflowException("BlobServiceClient is not initialized")

return self.blob_service_client.get_container_client(container_name)

async def get_blobs_list_async(
Expand Down
Loading