Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion providers/tableau/docs/connections/tableau.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ There are two ways to connect to Tableau using Airflow.
2. Use a `Token Authentication
<https://tableau.github.io/server-client-python/docs/api-ref#personalaccesstokenauth-class>`_
i.e. add a ``token_name`` and ``personal_access_token`` to the Airflow connection (deprecated).
3. Use `JSON Web Token (JWT) Authentication
<https://tableau.github.io/server-client-python/docs/sign-in-out.html#sign-in-with-json-web-token-jwt>`_
i.e add a ``jwt_file`` or a ``jwt_token`` to the Airflow connection extras.

Authentication by personal token was deprecated as Tableau automatically invalidates opened
personal token connection if one or more parallel connections with the same token are opened.
Expand Down Expand Up @@ -79,7 +82,8 @@ Extra (optional)
This is used with token authentication.
* ``verify``: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to True.
* ``cert``: if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.

* ``jwt_token`` - If jwt authentication should be used, the value of token is given via this parameter.
* ``jwt_file`` - If jwt authentication should be used, the location on disk for the file containing the jwt token.

When specifying the connection in environment variable you should specify
it using URI syntax.
Expand Down
2 changes: 1 addition & 1 deletion providers/tableau/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ requires-python = "~=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.10.0",
"tableauserverclient>=0.25",
"tableauserverclient>=0.27",
]

[dependency-groups]
Expand Down
35 changes: 33 additions & 2 deletions providers/tableau/src/airflow/providers/tableau/hooks/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import time
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

from tableauserverclient import Pager, Server, TableauAuth
from tableauserverclient import JWTAuth, Pager, Server, TableauAuth

from airflow.exceptions import AirflowException
from airflow.utils.helpers import exactly_one

try:
from airflow.sdk import BaseHook
Expand Down Expand Up @@ -113,8 +115,33 @@ def get_conn(self) -> Auth.contextmgr:

:return: an authorized Tableau Server Context Manager object.
"""
if self.conn.login and self.conn.password:
extra = self.conn.extra_dejson
password_auth_set = self.conn.login and self.conn.password
jwt_auth_set = extra.get("auth") == "jwt"

if password_auth_set and jwt_auth_set:
raise AirflowException(
"Username/password authentication and JWT authentication cannot be used simultaneously. Please specify only one authentication method."
)
if password_auth_set:
return self._auth_via_password()
if jwt_auth_set:
if not exactly_one(jwt_file := "jwt_file" in extra, jwt_token := "jwt_token" in extra):
msg = (
"When auth set to 'jwt' then expected exactly one parameter 'jwt_file' or 'jwt_token'"
" in connection extra, but "
)
if jwt_file and jwt_token:
msg += "provided both."
else:
msg += "none of them provided."
raise ValueError(msg)

if jwt_file:
self.jwt_token = Path(extra["jwt_file"]).read_text()
else:
self.jwt_token = extra["jwt_token"]
return self._auth_via_jwt()
raise NotImplementedError("No Authentication method found for given Credentials!")

def _auth_via_password(self) -> Auth.contextmgr:
Expand All @@ -125,6 +152,10 @@ def _auth_via_password(self) -> Auth.contextmgr:
)
return self.server.auth.sign_in(tableau_auth)

def _auth_via_jwt(self) -> Auth.contextmgr:
jwt_auth = JWTAuth(jwt=self.jwt_token, site_id=self.site_id)
return self.server.auth.sign_in(jwt_auth)

def get_all(self, resource_name: str) -> Pager:
"""
Get all items of the given resource.
Expand Down
157 changes: 157 additions & 0 deletions providers/tableau/tests/unit/tableau/hooks/test_tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# under the License.
from __future__ import annotations

import json
import tempfile
from unittest.mock import MagicMock, patch

import pytest

from airflow import configuration, models
from airflow.exceptions import AirflowException
from airflow.providers.tableau.hooks.tableau import TableauHook, TableauJobFinishCode


Expand Down Expand Up @@ -73,6 +76,40 @@ def setup_connections(self, create_connection_without_db):
extra='{"verify": false}',
)
)
create_connection_without_db(
models.Connection(
conn_id="tableau_test_jwt_auth",
conn_type="tableau",
host="tableau",
extra='{"auth": "jwt", "jwt_token": "fake_jwt_token", "site_id": ""}',
)
)
create_connection_without_db(
models.Connection(
conn_id="tableau_test_ssl_connection_certificates_path_with_jwt",
conn_type="tableau",
host="tableau",
extra='{"auth": "jwt", "jwt_token": "fake_jwt_token", "site_id": "", "verify": "my_cert_path", "cert": "my_client_cert_path"}',
)
)
create_connection_without_db(
models.Connection(
conn_id="tableau_test_jwt_auth_no_token",
conn_type="tableau",
host="tableau",
extra='{"auth": "jwt", "site_id": ""}',
)
)
create_connection_without_db(
models.Connection(
conn_id="tableau_test_both_auth",
conn_type="tableau",
host="tableau",
login="user",
password="password",
extra='{"auth": "jwt", "jwt_token": "fake_jwt_token", "site_id": ""}',
)
)

@patch("airflow.providers.tableau.hooks.tableau.TableauAuth")
@patch("airflow.providers.tableau.hooks.tableau.Server")
Expand All @@ -90,6 +127,126 @@ def test_get_conn_auth_via_password_and_site_in_connection(self, mock_server, mo
mock_server.return_value.auth.sign_in.assert_called_once_with(mock_tableau_auth.return_value)
mock_server.return_value.auth.sign_out.assert_called_once_with()

@patch("airflow.providers.tableau.hooks.tableau.JWTAuth")
@patch("airflow.providers.tableau.hooks.tableau.Server")
def test_jwt_auth(self, mock_server, mock_tableau_jwt_auth):
"""
Test get conn using JWT authentication via a token string
"""
with TableauHook(tableau_conn_id="tableau_test_jwt_auth") as tableau_hook:
mock_server.assert_called_once_with(tableau_hook.conn.host)
mock_tableau_jwt_auth.assert_called_once_with(
jwt="fake_jwt_token",
site_id="",
)
mock_server.return_value.auth.sign_in.assert_called_once_with(mock_tableau_jwt_auth.return_value)
mock_server.return_value.auth.sign_out.assert_called_once_with()

@patch("airflow.providers.tableau.hooks.tableau.JWTAuth")
@patch("airflow.providers.tableau.hooks.tableau.Server")
def test_jwt_auth_with_ssl(self, mock_server, mock_tableau_jwt_auth):
"""
Test get conn using JWT authentication via a token string and ssl
"""
with TableauHook(
tableau_conn_id="tableau_test_ssl_connection_certificates_path_with_jwt"
) as tableau_hook:
mock_server.assert_called_once_with(tableau_hook.conn.host)
mock_server.return_value.add_http_options.assert_called_once_with(
options_dict={
"verify": tableau_hook.conn.extra_dejson["verify"],
"cert": tableau_hook.conn.extra_dejson["cert"],
}
)
mock_tableau_jwt_auth.assert_called_once_with(
jwt="fake_jwt_token",
site_id="",
)
mock_server.return_value.auth.sign_in.assert_called_once_with(mock_tableau_jwt_auth.return_value)
mock_server.return_value.auth.sign_out.assert_called_once_with()

def test_jwt_auth_with_no_token_provided(self):
"""
Test get conn using JWT authentication without providing a token
"""
with pytest.raises(
ValueError,
match=r"When auth set to 'jwt' then expected exactly one parameter 'jwt_file' or 'jwt_token' in connection extra, but none of them provided.",
):
TableauHook(tableau_conn_id="tableau_test_jwt_auth_no_token").get_conn()

@patch("airflow.providers.tableau.hooks.tableau.JWTAuth")
def test_jwt_auth_with_two_tokens_provided(self, mock_tableau_jwt_auth, create_connection_without_db):
"""
Test get conn using JWT authentication while providing both a string token and a path

The connection setup is done within this test to handle the creation of a temporary file
for the JWT token, keeping the shared setup_connections function focused solely on connection logic
"""
fake_jwt_token = "fake_jwt_token_from_file"
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as jwt_file:
jwt_file.write(fake_jwt_token)
jwt_file_path = jwt_file.name
create_connection_without_db(
models.Connection(
conn_id="tableau_test_jwt_file_auth_two_tokens",
conn_type="tableau",
host="tableau",
extra=json.dumps(
{"auth": "jwt", "jwt_file": jwt_file_path, "jwt_token": "fake_jwt_token", "site_id": ""}
),
)
)
with pytest.raises(
ValueError,
match=r"When auth set to 'jwt' then expected exactly one parameter 'jwt_file' or 'jwt_token' in connection extra, but provided both.",
):
TableauHook(tableau_conn_id="tableau_test_jwt_file_auth_two_tokens").get_conn()
mock_tableau_jwt_auth.assert_not_called()

@patch("airflow.providers.tableau.hooks.tableau.JWTAuth")
@patch("airflow.providers.tableau.hooks.tableau.Server")
def test_jwt_auth_from_file(self, mock_server, mock_tableau_jwt_auth, create_connection_without_db):
"""
Test get conn using JWT token read from file

The connection setup is done within this test to handle the creation of a temporary file
for the JWT token, keeping the shared setup_connections function focused solely on connection logic
"""
fake_jwt_token = "fake_jwt_token_from_file"
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as jwt_file:
jwt_file.write(fake_jwt_token)
jwt_file_path = jwt_file.name
create_connection_without_db(
models.Connection(
conn_id="tableau_test_jwt_file_auth",
conn_type="tableau",
host="tableau",
extra=json.dumps({"auth": "jwt", "jwt_file": jwt_file_path, "site_id": ""}),
)
)

with TableauHook(tableau_conn_id="tableau_test_jwt_file_auth") as tableau_hook:
mock_server.assert_called_once_with(tableau_hook.conn.host)
mock_tableau_jwt_auth.assert_called_once_with(
jwt=fake_jwt_token,
site_id="",
)
mock_server.return_value.auth.sign_in.assert_called_once_with(mock_tableau_jwt_auth.return_value)
mock_server.return_value.auth.sign_out.assert_called_once_with()

@patch("airflow.providers.tableau.hooks.tableau.TableauAuth")
def test_both_auth(self, mock_tableau_auth):
"""
Test whether an error is thrown if both auth types are set
"""
with pytest.raises(
AirflowException,
match=r"Username/password authentication and JWT authentication cannot be used simultaneously. Please specify only one authentication method.",
):
TableauHook(tableau_conn_id="tableau_test_both_auth").get_conn()
mock_tableau_auth.assert_not_called()

@patch("airflow.providers.tableau.hooks.tableau.TableauAuth")
@patch("airflow.providers.tableau.hooks.tableau.Server")
def test_get_conn_ssl_cert_path(self, mock_server, mock_tableau_auth):
Expand Down