-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Make ExternalTaskSensor work with Task SDK
#48651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c2e5875 to
b57ab8c
Compare
|
Oh thanks kaxil , wrote similar way for the |
d272433 to
7566433
Compare
|
@gopidesupavan I had to add |
7566433 to
25d6863
Compare
thanks, will rebase mine, once it merged. |
25d6863 to
2526882
Compare
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments from initial look
| router = VersionedAPIRouter() | ||
|
|
||
| ti_id_router = VersionedAPIRouter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment here explaining why we need this one? It will be clear to reader too
| @router.get("/count", status_code=status.HTTP_200_OK) | ||
| def get_dr_count( | ||
| dag_id: str, | ||
| session: SessionDep, | ||
| logical_dates: Annotated[list[UtcDateTime] | None, Query()] = None, | ||
| run_ids: Annotated[list[str] | None, Query()] = None, | ||
| states: Annotated[list[str] | None, Query()] = None, | ||
| ) -> int: | ||
| """Get the count of DAG runs matching the given criteria.""" | ||
| query = select(func.count()).select_from(DagRun).where(DagRun.dag_id == dag_id) | ||
|
|
||
| if logical_dates: | ||
| query = query.where(DagRun.logical_date.in_(logical_dates)) | ||
|
|
||
| if run_ids: | ||
| query = query.where(DagRun.run_id.in_(run_ids)) | ||
|
|
||
| if states: | ||
| query = query.where(DagRun.state.in_(states)) | ||
|
|
||
| count = session.scalar(query) | ||
| return count or 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checked it here: https://docs.cadwyn.dev/concepts/endpoint_migrations/#defining-endpoints-that-didnt-exist-in-old-versions. Seems we will need it. Let me take it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just had a chat with the Cadwyn Author ( @zmievsa ) who recommends to only add it for breaking changes.
Depending on your needs. My general recommendation is to only add migrations for breaking changes
https://docs.cadwyn.dev/how_to/change_endpoints/#add-a-new-endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make Cadwyn's docs more verbose on when it makes the most sense to add a migration. Concepts section mostly focuses on what's possible with Cadwyn while "how to" focuses on what you should actually do.
Either way 99% of the time it makes sense to add an endpoint to all versions since it's not a breaking change. Your users will thank you later
Update: https://docs.cadwyn.dev/concepts/endpoint_migrations/#defining-endpoints-that-didnt-exist-in-old-versions added a bunch of notes here and there about this.
|
|
||
| @router.only_exists_in_older_versions | ||
| @router.post( | ||
| @router.get("/count", status_code=status.HTTP_200_OK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #48651 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check #48651 (comment) :)
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
Show resolved
Hide resolved
| assert response.json() == "2024-01-02T00:00:00Z" | ||
|
|
||
|
|
||
| class TestGetCount: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class TestGetCount: | |
| class TestGetTICount: |
providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
Show resolved
Hide resolved
| if AIRFLOW_V_3_0_PLUS: | ||
| dag_bag.bag_dag(dag=dag) | ||
| else: | ||
| dag_bag.bag_dag(dag=dag, root_dag=dag) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just define a mini utility function, too many usages here
| resp = self.client.get("task-instances/count", params=params) | ||
| return TICount(count=resp.json()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safeguard it inside a try/except in case of a 500?
| resp = self.client.get("dag-runs/count", params=params) | ||
| return DRCount(count=resp.json()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safeguard in a try/except for cases like 500?
closes #47447
closes #47948
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.