Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Apr 2, 2025

closes #47447

closes #47948

image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@gopidesupavan
Copy link
Member

Oh thanks kaxil , wrote similar way for the _get_count() for triggers as it also uses inside poke, i will fork this branch and will raise my changes in different PR for WorkflowTrigger.

@kaxil kaxil force-pushed the external-task-sensor branch 2 times, most recently from d272433 to 7566433 Compare April 3, 2025 00:42
@kaxil kaxil marked this pull request as ready for review April 3, 2025 00:43
@kaxil kaxil requested review from amoghrajesh and ashb as code owners April 3, 2025 00:43
@kaxil
Copy link
Member Author

kaxil commented Apr 3, 2025

@gopidesupavan I had to add /execution/dag-runs/count endpoint too since the ExternalTaskSensor needed it

@kaxil kaxil force-pushed the external-task-sensor branch from 7566433 to 25d6863 Compare April 3, 2025 01:10
@gopidesupavan
Copy link
Member

@gopidesupavan I had to add /execution/dag-runs/count endpoint too since the ExternalTaskSensor needed it

thanks, will rebase mine, once it merged.

@kaxil kaxil force-pushed the external-task-sensor branch from 25d6863 to 2526882 Compare April 3, 2025 07:41
@kaxil kaxil merged commit 6775bf7 into apache:main Apr 3, 2025
90 checks passed
@kaxil kaxil deleted the external-task-sensor branch April 3, 2025 08:27
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments from initial look

Comment on lines +58 to +60
router = VersionedAPIRouter()

ti_id_router = VersionedAPIRouter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here explaining why we need this one? It will be clear to reader too

Comment on lines +157 to +178
@router.get("/count", status_code=status.HTTP_200_OK)
def get_dr_count(
dag_id: str,
session: SessionDep,
logical_dates: Annotated[list[UtcDateTime] | None, Query()] = None,
run_ids: Annotated[list[str] | None, Query()] = None,
states: Annotated[list[str] | None, Query()] = None,
) -> int:
"""Get the count of DAG runs matching the given criteria."""
query = select(func.count()).select_from(DagRun).where(DagRun.dag_id == dag_id)

if logical_dates:
query = query.where(DagRun.logical_date.in_(logical_dates))

if run_ids:
query = query.where(DagRun.run_id.in_(run_ids))

if states:
query = query.where(DagRun.state.in_(states))

count = session.scalar(query)
return count or 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a chat with the Cadwyn Author ( @zmievsa ) who recommends to only add it for breaking changes.

Depending on your needs. My general recommendation is to only add migrations for breaking changes
https://docs.cadwyn.dev/how_to/change_endpoints/#add-a-new-endpoint

Copy link

@zmievsa zmievsa Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make Cadwyn's docs more verbose on when it makes the most sense to add a migration. Concepts section mostly focuses on what's possible with Cadwyn while "how to" focuses on what you should actually do.

Either way 99% of the time it makes sense to add an endpoint to all versions since it's not a breaking change. Your users will thank you later

Update: https://docs.cadwyn.dev/concepts/endpoint_migrations/#defining-endpoints-that-didnt-exist-in-old-versions added a bunch of notes here and there about this.


@router.only_exists_in_older_versions
@router.post(
@router.get("/count", status_code=status.HTTP_200_OK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check #48651 (comment) :)

assert response.json() == "2024-01-02T00:00:00Z"


class TestGetCount:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class TestGetCount:
class TestGetTICount:

Comment on lines +1859 to 1863
if AIRFLOW_V_3_0_PLUS:
dag_bag.bag_dag(dag=dag)
else:
dag_bag.bag_dag(dag=dag, root_dag=dag)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just define a mini utility function, too many usages here

Comment on lines +227 to +228
resp = self.client.get("task-instances/count", params=params)
return TICount(count=resp.json())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safeguard it inside a try/except in case of a 500?

Comment on lines +500 to +501
resp = self.client.get("dag-runs/count", params=params)
return DRCount(count=resp.json())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safeguard in a try/except for cases like 500?

nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
diogotrodrigues pushed a commit to diogotrodrigues/airflow that referenced this pull request Apr 6, 2025
simonprydden pushed a commit to simonprydden/airflow that referenced this pull request Apr 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

4 participants