|
23 | 23 |
|
24 | 24 | from sqlalchemy import select |
25 | 25 |
|
26 | | -from airflow.api_internal.internal_api_call import internal_api_call |
| 26 | +from airflow.models import Variable |
27 | 27 | from airflow.secrets import BaseSecretsBackend |
28 | 28 | from airflow.utils.session import NEW_SESSION, provide_session |
29 | 29 |
|
30 | 30 | if TYPE_CHECKING: |
31 | 31 | from sqlalchemy.orm import Session |
32 | 32 |
|
33 | | - from airflow.models.connection import Connection |
| 33 | + from airflow.models import Connection |
34 | 34 |
|
35 | 35 |
|
36 | 36 | class MetastoreBackend(BaseSecretsBackend): |
37 | 37 | """Retrieves Connection object and Variable from airflow metastore database.""" |
38 | 38 |
|
39 | 39 | @provide_session |
40 | 40 | def get_connection(self, conn_id: str, session: Session = NEW_SESSION) -> Connection | None: |
41 | | - return MetastoreBackend._fetch_connection(conn_id, session=session) |
42 | | - |
43 | | - @provide_session |
44 | | - def get_variable(self, key: str, session: Session = NEW_SESSION) -> str | None: |
45 | 41 | """ |
46 | | - Get Airflow Variable from Metadata DB. |
| 42 | + Get Airflow Connection from Metadata DB. |
47 | 43 |
|
48 | | - :param key: Variable Key |
49 | | - :return: Variable Value |
| 44 | + :param conn_id: Connection ID |
| 45 | + :param session: SQLAlchemy Session |
| 46 | + :return: Connection Object |
50 | 47 | """ |
51 | | - return MetastoreBackend._fetch_variable(key=key, session=session) |
52 | | - |
53 | | - @staticmethod |
54 | | - @internal_api_call |
55 | | - @provide_session |
56 | | - def _fetch_connection(conn_id: str, session: Session = NEW_SESSION) -> Connection | None: |
57 | | - from airflow.models.connection import Connection |
| 48 | + from airflow.models import Connection |
58 | 49 |
|
59 | 50 | conn = session.scalar(select(Connection).where(Connection.conn_id == conn_id).limit(1)) |
60 | 51 | session.expunge_all() |
61 | 52 | return conn |
62 | 53 |
|
63 | | - @staticmethod |
64 | | - @internal_api_call |
65 | 54 | @provide_session |
66 | | - def _fetch_variable(key: str, session: Session = NEW_SESSION) -> str | None: |
67 | | - from airflow.models.variable import Variable |
| 55 | + def get_variable(self, key: str, session: Session = NEW_SESSION) -> str | None: |
| 56 | + """ |
| 57 | + Get Airflow Variable from Metadata DB. |
68 | 58 |
|
| 59 | + :param key: Variable Key |
| 60 | + :param session: SQLAlchemy Session |
| 61 | + :return: Variable Value |
| 62 | + """ |
69 | 63 | var_value = session.scalar(select(Variable).where(Variable.key == key).limit(1)) |
70 | 64 | session.expunge_all() |
71 | 65 | if var_value: |
|
0 commit comments