Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow/providers/asana/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@
Changelog
---------

3.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

* This release of provider is only available for Airflow 2.3+ as explained in the Apache Airflow
providers support policy https://github.com/apache/airflow/blob/main/README.md#support-for-providers

Misc
~~~~

* In AsanaHook, non-prefixed extra fields are supported and are preferred. So if you should update your connection to replace ``extra__asana__workspace`` with ``workspace`` etc.

2.0.1
.....

Expand Down
55 changes: 49 additions & 6 deletions airflow/providers/asana/hooks/asana.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Connect to Asana."""
from __future__ import annotations

from functools import wraps
from typing import Any

from asana import Client # type: ignore[attr-defined]
Expand All @@ -27,6 +28,34 @@
from airflow.hooks.base import BaseHook


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to say that this is the second time I see you using this and it should maybe live in airflow.hooks.base as a shared helper method. Is there a reason not to do it that way?

def _ensure_prefixes(conn_type):
"""
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""

def dec(func):
@wraps(func)
def inner():
field_behaviors = func()
conn_attrs = {'host', 'schema', 'login', 'password', 'port', 'extra'}

def _ensure_prefix(field):
if field not in conn_attrs and not field.startswith('extra__'):
return f"extra__{conn_type}__{field}"
else:
return field

if 'placeholders' in field_behaviors:
placeholders = field_behaviors['placeholders']
field_behaviors['placeholders'] = {_ensure_prefix(k): v for k, v in placeholders.items()}
return field_behaviors

return inner

return dec


class AsanaHook(BaseHook):
"""Wrapper around Asana Python client library."""

Expand All @@ -39,8 +68,21 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.connection = self.get_connection(conn_id)
extras = self.connection.extra_dejson
self.workspace = extras.get("extra__asana__workspace") or None
self.project = extras.get("extra__asana__project") or None
self.workspace = self._get_field(extras, "workspace") or None
self.project = self._get_field(extras, "project") or None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, maybe _get_field should live in BaseHook? It would make the future cleanup easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i considered that, and it is appealing, but I reasoned that it's not practical because there are frequently subtleties to the way that these params are gotten, idiosyncrasies that require little differences from one provider to another. some of them are done in the same way, but some not. and, it's just backcompat so, do we really want to add a new method to base hook for deprecated backcompat? further, even if we did add this to base hook now, we would not be able to use it until provider min version has this method (which would be a while). alternatively, eventually we could either standardize with appropriate deprecation signaling and make the breaking change for the ones that handle it differently. another option is we could just get rid of the backcompat altogether, but this would force a lot of users to change their connections, and i don't think we should because of the negative experience for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, you're right. The big one to me is that basehook would mean waiting for airflow core to update so the providers can inherit it. If you do it this way, it's a little more work but it's "immediate" and self-contained. I think you made the right call even if it means some redundant code.

def _get_field(self, extras: dict, field_name: str):
"""Get field from extra, first checking short name, then for backcompat we check for prefixed name."""
backcompat_prefix = "extra__asana__"
if field_name.startswith('extra__'):
raise ValueError(
f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix "
"when using this method."
)
if field_name in extras:
return extras[field_name] or None
prefixed_name = f"{backcompat_prefix}{field_name}"
return extras.get(prefixed_name) or None

def get_conn(self) -> Client:
return self.client
Expand All @@ -53,20 +95,21 @@ def get_connection_form_widgets() -> dict[str, Any]:
from wtforms import StringField

return {
"extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}

@staticmethod
@_ensure_prefixes(conn_type='asana')
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"extra__asana__workspace": "Asana workspace gid",
"extra__asana__project": "Asana project gid",
"workspace": "Asana workspace gid",
"project": "Asana project gid",
},
}

Expand Down
109 changes: 71 additions & 38 deletions tests/providers/asana/hooks/test_asana.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
# under the License.
from __future__ import annotations

import unittest
import os
from unittest.mock import patch

import pytest
from asana import Client
from pytest import param

from airflow.models import Connection
from airflow.providers.asana.hooks.asana import AsanaHook
from tests.test_utils.providers import get_provider_min_airflow_version, object_exists


class TestAsanaHook(unittest.TestCase):
class TestAsanaHook:
"""
Tests for AsanaHook Asana client retrieval
"""
Expand All @@ -40,7 +43,7 @@ def test_asana_client_retrieved(self):
):
hook = AsanaHook()
client = hook.get_conn()
self.assertEqual(type(client), Client)
assert type(client) == Client

def test_missing_password_raises(self):
"""
Expand All @@ -49,7 +52,7 @@ def test_missing_password_raises(self):
"""
with patch.object(AsanaHook, "get_connection", return_value=Connection(conn_type="asana")):
hook = AsanaHook()
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
hook.get_conn()

def test_merge_create_task_parameters_default_project(self):
Expand All @@ -62,7 +65,7 @@ def test_merge_create_task_parameters_default_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1"]}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_specified_project(self):
"""
Expand All @@ -74,10 +77,7 @@ def test_merge_create_task_parameters_specified_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1", "2"]}
self.assertEqual(
expected_merged_params,
hook._merge_create_task_parameters("test", {"projects": ["1", "2"]}),
)
assert hook._merge_create_task_parameters("test", {"projects": ["1", "2"]}) == expected_merged_params

def test_merge_create_task_parameters_specified_workspace(self):
"""
Expand All @@ -89,7 +89,7 @@ def test_merge_create_task_parameters_specified_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "workspace": "1"}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_default_project_overrides_default_workspace(self):
"""
Expand All @@ -105,7 +105,7 @@ def test_merge_create_task_parameters_default_project_overrides_default_workspac
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["1"]}
self.assertEqual(expected_merged_params, hook._merge_create_task_parameters("test", {}))
assert hook._merge_create_task_parameters("test", {}) == expected_merged_params

def test_merge_create_task_parameters_specified_project_overrides_default_workspace(self):
"""
Expand All @@ -121,10 +121,7 @@ def test_merge_create_task_parameters_specified_project_overrides_default_worksp
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"name": "test", "projects": ["2"]}
self.assertEqual(
expected_merged_params,
hook._merge_create_task_parameters("test", {"projects": ["2"]}),
)
assert hook._merge_create_task_parameters("test", {"projects": ["2"]}) == expected_merged_params

def test_merge_find_task_parameters_default_project(self):
"""
Expand All @@ -136,7 +133,7 @@ def test_merge_find_task_parameters_default_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "1"}
self.assertEqual(expected_merged_params, hook._merge_find_task_parameters({}))
assert hook._merge_find_task_parameters({}) == expected_merged_params

def test_merge_find_task_parameters_specified_project(self):
"""
Expand All @@ -148,10 +145,7 @@ def test_merge_find_task_parameters_specified_project(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"project": "2"}),
)
assert hook._merge_find_task_parameters({"project": "2"}) == expected_merged_params

def test_merge_find_task_parameters_default_workspace(self):
"""
Expand All @@ -163,10 +157,7 @@ def test_merge_find_task_parameters_default_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "1", "assignee": "1"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"assignee": "1"}),
)
assert hook._merge_find_task_parameters({"assignee": "1"}) == expected_merged_params

def test_merge_find_task_parameters_specified_workspace(self):
"""
Expand All @@ -178,10 +169,7 @@ def test_merge_find_task_parameters_specified_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "2", "assignee": "1"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"workspace": "2", "assignee": "1"}),
)
assert hook._merge_find_task_parameters({"workspace": "2", "assignee": "1"}) == expected_merged_params

def test_merge_find_task_parameters_default_project_overrides_workspace(self):
"""
Expand All @@ -196,7 +184,7 @@ def test_merge_find_task_parameters_default_project_overrides_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "1"}
self.assertEqual(expected_merged_params, hook._merge_find_task_parameters({}))
assert hook._merge_find_task_parameters({}) == expected_merged_params

def test_merge_find_task_parameters_specified_project_overrides_workspace(self):
"""
Expand All @@ -212,10 +200,7 @@ def test_merge_find_task_parameters_specified_project_overrides_workspace(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"project": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_find_task_parameters({"project": "2"}),
)
assert hook._merge_find_task_parameters({"project": "2"}) == expected_merged_params

def test_merge_project_parameters(self):
"""
Expand All @@ -226,7 +211,7 @@ def test_merge_project_parameters(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "1", "name": "name"}
self.assertEqual(expected_merged_params, hook._merge_project_parameters({"name": "name"}))
assert hook._merge_project_parameters({"name": "name"}) == expected_merged_params

def test_merge_project_parameters_override(self):
"""
Expand All @@ -237,7 +222,55 @@ def test_merge_project_parameters_override(self):
with patch.object(AsanaHook, "get_connection", return_value=conn):
hook = AsanaHook()
expected_merged_params = {"workspace": "2"}
self.assertEqual(
expected_merged_params,
hook._merge_project_parameters({"workspace": "2"}),
)
assert hook._merge_project_parameters({"workspace": "2"}) == expected_merged_params

def test__ensure_prefixes_removal(self):
"""Ensure that _ensure_prefixes is removed from snowflake when airflow min version >= 2.5.0."""
path = 'airflow.providers.asana.hooks.asana._ensure_prefixes'
if not object_exists(path):
raise Exception(
"You must remove this test. It only exists to "
"remind us to remove decorator `_ensure_prefixes`."
)

if get_provider_min_airflow_version('apache-airflow-providers-asana') >= (2, 5):
raise Exception(
"You must now remove `_ensure_prefixes` from AsanaHook."
" The functionality is now taken care of by providers manager."
)

def test__ensure_prefixes(self):
"""
Check that ensure_prefixes decorator working properly

Note: remove this test when removing ensure_prefixes (after min airflow version >= 2.5.0
"""
assert list(AsanaHook.get_ui_field_behaviour()['placeholders'].keys()) == [
'password',
'extra__asana__workspace',
'extra__asana__project',
]

@pytest.mark.parametrize(
'uri',
[
param(
'a://?extra__asana__workspace=abc&extra__asana__project=abc',
id='prefix',
),
param('a://?workspace=abc&project=abc', id='no-prefix'),
],
)
def test_backcompat_prefix_works(self, uri):
with patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
hook = AsanaHook('my_conn')
assert hook.workspace == 'abc'
assert hook.project == 'abc'

def test_backcompat_prefix_both_prefers_short(self):
with patch.dict(
os.environ,
{"AIRFLOW_CONN_MY_CONN": 'a://?workspace=non-prefixed&extra__asana__workspace=prefixed'},
):
hook = AsanaHook('my_conn')
assert hook.workspace == 'non-prefixed'
10 changes: 10 additions & 0 deletions tests/test_utils/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,13 @@ def get_provider_version(provider_name):

info = ProvidersManager().providers[provider_name]
return semver.VersionInfo.parse(info.version)


def get_provider_min_airflow_version(provider_name):
from airflow.providers_manager import ProvidersManager

p = ProvidersManager()
deps = p.providers[provider_name].data['dependencies']
airflow_dep = [x for x in deps if x.startswith('apache-airflow')][0]
min_airflow_version = tuple(map(int, airflow_dep.split('>=')[1].split('.')))
return min_airflow_version