Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Dec 18, 2024

part of #44481

  • Added a minimal Connection user-facing object in Task SDK definition for use in the DAG file
  • Added logic to get Connections in the context. Fixed some bugs in the way related to Connection parsing/serializing!

Now, we have following Connection related objects:

  • ConnectionResponse is auto-generated and tightly coupled with the API schema.
  • ConnectionResult is runtime-specific and meant for internal communication between Supervisor & Task Runner.
  • Connection class here is where the public-facing, user-relevant aspects are exposed, hiding internal details.

Next up:

  • Same for XCom & Variable
  • Implementation of BaseHook.get_conn

Tested it with a DAG:

image

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag


class CustomOperator(BaseOperator):
    def execute(self, context):
        import os
        os.environ["AIRFLOW_CONN_AIRFLOW_DB"] = "sqlite:///home/airflow/airflow.db"
        task_id = context["task_instance"].task_id
        print(f"Hello World {task_id}!")
        print(context)
        print(context["conn"].airflow_db)
        assert context["conn"].airflow_db.conn_id == "airflow_db"


@dag()
def super_basic_run():
    CustomOperator(task_id="hello")


super_basic_run()

For case where a connection is not found

image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil requested review from amoghrajesh and ashb December 18, 2024 16:25
@kaxil kaxil force-pushed the get-conn-working-in-context branch from f812f2f to 518f485 Compare December 18, 2024 16:27
@kaxil kaxil force-pushed the get-conn-working-in-context branch 2 times, most recently from 304140b to 01a36d2 Compare December 18, 2024 17:27
@kaxil kaxil requested review from amoghrajesh and ashb December 18, 2024 17:29
@kaxil kaxil force-pushed the get-conn-working-in-context branch 3 times, most recently from eef7296 to 38c6565 Compare December 18, 2024 20:26
- Added a minimal Connection user-facing object in Task SDK definition for use in the DAG file
- Added logic to get Connections in the context. Fixed some bugs in the way related to Connection parsing/serializing!
@kaxil kaxil force-pushed the get-conn-working-in-context branch from 38c6565 to 2937a0c Compare December 19, 2024 05:21
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments have been addressed

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments lying open, rest LGTM

@kaxil kaxil merged commit 4de24a1 into apache:main Dec 19, 2024
48 checks passed
@kaxil kaxil deleted the get-conn-working-in-context branch December 19, 2024 09:51
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
part of apache#44481

- Added a minimal Connection user-facing object in Task SDK definition for use in the DAG file
- Added logic to get Connections in the context. Fixed some bugs in the way related to Connection parsing/serializing!


Now, we have following Connection related objects:
- `ConnectionResponse` is auto-generated and tightly coupled with the API schema.
- `ConnectionResult` is runtime-specific and meant for internal communication between Supervisor & Task Runner.
- `Connection` class here is where the public-facing, user-relevant aspects are exposed, hiding internal details.

**Next up**:

- Same for XCom & Variable
- Implementation of BaseHook.get_conn

Tested it with a DAG:

<img width="1711" alt="image" src="https://github.com/user-attachments/assets/14d28fb7-f6c5-4fbe-b226-46873af2d0f3" />

DAG:

```py
from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag


class CustomOperator(BaseOperator):
    def execute(self, context):
        import os
        os.environ["AIRFLOW_CONN_AIRFLOW_DB"] = "sqlite:///home/airflow/airflow.db"
        task_id = context["task_instance"].task_id
        print(f"Hello World {task_id}!")
        print(context)
        print(context["conn"].airflow_db)
        assert context["conn"].airflow_db.conn_id == "airflow_db"


@dag()
def super_basic_run():
    CustomOperator(task_id="hello")


super_basic_run()

```

For case where a **connection is not found**

<img width="1435" alt="image" src="https://github.com/user-attachments/assets/7c5e0cb4-6ed4-41aa-9a57-e5641adce954" />
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants