Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions providers/microsoft/azure/docs/connections/wasb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The Microsoft Azure Blob Storage connection type enables the Azure Blob Storage
Authenticating to Azure Blob Storage
------------------------------------

There are six ways to connect to Azure Blob Storage using Airflow.
There are seven ways to connect to Azure Blob Storage using Airflow.

1. Use `token credentials`_
i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection.
Expand All @@ -37,8 +37,9 @@ There are six ways to connect to Azure Blob Storage using Airflow.
i.e. add a key config to ``sas_token`` in the Airflow connection.
4. Use a `Connection String`_
i.e. add connection string to ``connection_string`` in the Airflow connection.
5. Use managed identity by setting ``managed_identity_client_id``, ``workload_identity_tenant_id`` (under the hook, it uses DefaultAzureCredential_ with these arguments)
6. Fallback on DefaultAzureCredential_.
5. Use account key by setting ``account_key`` in the Airflow connection extra fields.
6. Use managed identity by setting ``managed_identity_client_id``, ``workload_identity_tenant_id`` (under the hook, it uses DefaultAzureCredential_ with these arguments)
7. Fallback on DefaultAzureCredential_.
This includes a mechanism to try different options to authenticate: Managed System Identity, environment variables, authentication through Azure CLI, etc.

Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should
Expand Down Expand Up @@ -84,6 +85,7 @@ Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in Azure connection.
The following parameters are all optional:

* ``account_key``: Specify the account key for Azure Blob Storage authentication. This will be checked before falling back to DefaultAzureCredential_.
* ``client_secret_auth_config``: Extra config to pass while authenticating as a service principal using `ClientSecretCredential`_ It can be left out to fall back on DefaultAzureCredential_.
* ``managed_identity_client_id``: The client ID of a user-assigned managed identity. If provided with `workload_identity_tenant_id`, they'll pass to ``DefaultAzureCredential``.
* ``workload_identity_tenant_id``: ID of the application's Microsoft Entra tenant. Also called its "directory" ID. If provided with `managed_identity_client_id`, they'll pass to ``DefaultAzureCredential``.
Expand Down
2 changes: 1 addition & 1 deletion providers/microsoft/azure/docs/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Setup Steps:
''''''''''''''

#. Install the provider package with ``pip install apache-airflow-providers-microsoft-azure``.
#. Ensure :ref:`connection <howto/connection:wasb>` is already setup with read and write access to Azure Blob Storage in the ``remote_wasb_log_container`` container and path ``remote_base_log_folder``.
#. Ensure :ref:`connection <howto/connection:wasb>` is already setup with read and write access to Azure Blob Storage in the ``remote_wasb_log_container`` container and path ``remote_base_log_folder``. The connection should be configured with appropriate authentication credentials (such as account key, shared access key, or managed identity). For account key authentication, you can add ``account_key`` to the connection's extra fields as a JSON dictionary: ``{"account_key": "your_account_key"}``.
#. Setup the above configuration values. Please note that the container should already exist.
#. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
#. Verify that logs are showing up for newly executed tasks in the container at the specified base path you have defined.
Expand Down
1 change: 1 addition & 0 deletions providers/microsoft/azure/newsfragments/51944.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Azure Blob Storage authentication to check ``account_key`` field in connection extra before falling back to ``DefaultAzureCredential``
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class WasbHook(BaseHook):
These parameters have to be passed in Airflow Data Base: account_name and account_key.

Additional options passed in the 'extra' field of the connection will be
passed to the `BlockBlockService()` constructor. For example, authenticate
using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
passed to the `BlobServiceClient()` constructor. For example, authenticate
using a SAS token by adding {"sas_token": "YOUR_TOKEN"} or using an account key
by adding {"account_key": "YOUR_ACCOUNT_KEY"}.

If no authentication configuration is provided, DefaultAzureCredential will be used (applicable
when using Azure compute infrastructure).
Expand Down Expand Up @@ -121,7 +122,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
"tenant_id": "tenant",
"shared_access_key": "shared access key",
"sas_token": "account url or token",
"extra": "additional options for use with ClientSecretCredential or DefaultAzureCredential",
"extra": "additional options for use with ClientSecretCredential, DefaultAzureCredential, or account_key authentication",
},
}

Expand Down Expand Up @@ -198,13 +199,18 @@ def get_conn(self) -> BlobServiceClient:
# Fall back to old auth (password) or use managed identity if not provided.
credential = conn.password
if not credential:
managed_identity_client_id = self._get_field(extra, "managed_identity_client_id")
workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id")
credential = get_sync_default_azure_credential(
managed_identity_client_id=managed_identity_client_id,
workload_identity_tenant_id=workload_identity_tenant_id,
)
self.log.info("Using DefaultAzureCredential as credential")
# Check for account_key in extra fields before falling back to DefaultAzureCredential
account_key = self._get_field(extra, "account_key")
if account_key:
credential = account_key
else:
managed_identity_client_id = self._get_field(extra, "managed_identity_client_id")
workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id")
credential = get_sync_default_azure_credential(
managed_identity_client_id=managed_identity_client_id,
workload_identity_tenant_id=workload_identity_tenant_id,
)
self.log.info("Using DefaultAzureCredential as credential")
return BlobServiceClient(
account_url=account_url,
credential=credential,
Expand Down Expand Up @@ -646,13 +652,18 @@ async def get_async_conn(self) -> AsyncBlobServiceClient:
# Fall back to old auth (password) or use managed identity if not provided.
credential = conn.password
if not credential:
managed_identity_client_id = self._get_field(extra, "managed_identity_client_id")
workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id")
credential = get_async_default_azure_credential(
managed_identity_client_id=managed_identity_client_id,
workload_identity_tenant_id=workload_identity_tenant_id,
)
self.log.info("Using DefaultAzureCredential as credential")
# Check for account_key in extra fields before falling back to DefaultAzureCredential
account_key = self._get_field(extra, "account_key")
if account_key:
credential = account_key
else:
managed_identity_client_id = self._get_field(extra, "managed_identity_client_id")
workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id")
credential = get_async_default_azure_credential(
managed_identity_client_id=managed_identity_client_id,
workload_identity_tenant_id=workload_identity_tenant_id,
)
self.log.info("Using DefaultAzureCredential as credential")
self.blob_service_client = AsyncBlobServiceClient(
account_url=account_url,
credential=credential,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def setup_method(self, create_mock_connections):
self.public_read_conn_id = "pub_read_id"
self.public_read_conn_id_without_host = "pub_read_id_without_host"
self.managed_identity_conn_id = "managed_identity_conn_id"
self.account_key_conn_id = "account_key_conn_id"
self.authority = "https://test_authority.com"

self.proxies = PROXIES
Expand Down Expand Up @@ -135,6 +136,12 @@ def setup_method(self, create_mock_connections):
conn_type=self.connection_type,
extra={"proxies": self.proxies},
),
Connection(
conn_id=self.account_key_conn_id,
conn_type=self.connection_type,
login="testaccount",
extra={"account_key": "test_account_key", "proxies": self.proxies},
),
Connection(
conn_id="sas_conn_id",
conn_type=self.connection_type,
Expand Down Expand Up @@ -223,6 +230,16 @@ def test_azure_directory_connection(self, mocked_client_secret_credential, mocke
proxies=self.proxies,
)

def test_account_key_connection(self, mocked_blob_service_client):
"""Test that account_key from extra is used when no password is provided."""
WasbHook(wasb_conn_id=self.account_key_conn_id).get_conn()
mocked_blob_service_client.assert_called_once_with(
account_url="https://testaccount.blob.core.windows.net/",
credential="test_account_key",
proxies=self.proxies,
account_key="test_account_key",
)

@pytest.mark.parametrize(
"mocked_connection",
[
Expand Down Expand Up @@ -331,6 +348,7 @@ def test_sas_token_connection(self, conn_id_str, extra_key):
"azure_shared_key_test",
"ad_conn_id",
"managed_identity_conn_id",
"account_key_conn_id",
"sas_conn_id",
"extra__wasb__sas_conn_id",
"http_sas_conn_id",
Expand Down Expand Up @@ -659,6 +677,7 @@ def test_connection_failure(self, mocked_blob_service_client):
"azure_shared_key_test",
"ad_conn_id",
"managed_identity_conn_id",
"account_key_conn_id",
"sas_conn_id",
"extra__wasb__sas_conn_id",
"http_sas_conn_id",
Expand Down