Fix missing dag_id in get_task_instance (#64957)#64968
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
There was a problem hiding this comment.
Correctness: Verified. All 4 existing callers of TaskInstance.get_task_instance() already pass dag_id — the method signature is unchanged. The only other similar query on the model (refresh_from_db) already included dag_id, so this was the sole gap.
Test: The tasks_with_different_dags case creates the exact collision scenario and would fail with MultipleResultsFound on pre-fix code.
One suggestion for making the regression test more self-documenting (non-blocking — see inline comment).
(Review comments generated with the help of Claude)
Vamsi-klu
left a comment
There was a problem hiding this comment.
The fix is correct. I traced through the code to confirm the bug is realistic and not just theoretical. The run_id for scheduled runs is generated as "scheduled__" (dagrun.py:822), and the dag_run table's unique constraint is on (dag_id, run_id), not run_id alone. So two different DAGs on the same schedule will produce identical run_id values. If they also share a task name (which is common with standardized patterns like "extract" or "load"), the old query without dag_id would match both rows and scalar_one_or_none raises MultipleResultsFound.
All four callers of this classmethod (edge executor, edge logs route, and two test utilities) pass dag_id and expect it to filter correctly, so this was a real production path.
On the test, the "tasks_with_different_dags" parametrized case is the one that actually exercises the fix. The "tasks_with_different_runs" case tests rows that already differ by run_id, which was in the filter before the fix, so that case would have passed even without the change. Might be worth adding a comment in the test noting that the second case is just a sanity check rather than a regression test for this specific bug, so future readers understand the intent.
The test name test_instance_dag_and_run_id_uniqueness reads like it is testing a database constraint. Something like test_get_task_instance_filters_by_dag_id would make it clearer that the method under test is get_task_instance and the thing being verified is that dag_id is actually used in the query.
Building on what Dev-iL suggested, adding assert found_1.dag_id == dag_id_1 alongside the id comparison would make the assertion more self-documenting. The id check proves the right row came back, but the dag_id check makes it immediately obvious what property is being verified.
2fa8fc0 to
4f8ea47
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Fixes TaskInstance.get_task_instance() returning ambiguous results by ensuring the lookup filter includes dag_id, and adds a regression test to cover the composite-uniqueness behavior described in #64957.
Changes:
- Add
dag_idto the SQLAlchemy.filter_by(...)criteria inTaskInstance.get_task_instance(). - Add a parametrized unit test that creates two TaskInstances differing by
dag_idorrun_idand verifies they can be fetched uniquely.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/models/taskinstance.py | Ensures get_task_instance() filters by dag_id to match the TI composite identity. |
| airflow-core/tests/unit/models/test_taskinstance.py | Adds a regression test validating get_task_instance() correctly disambiguates TaskInstances. |
4f8ea47 to
1ed5afe
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Oh, this is also a performance impact because w/o dag_id it can not use the index efficiently! LGTM!
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
Backport successfully created: v3-2-testNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
|
Closes #64957
Closes #65044
TaskInstance uniqueness is defined by the composite key (dag_id, task_id, run_id, map_index) (see the SQLAlchemy model):
airflow/airflow-core/src/airflow/models/taskinstance.py
Lines 591 to 592 in f4f48b9
The problem is that the filter in
get_task_instancedoes not includedag_id, violating this invariant. This PR fixes the issue and adds a regression test (test_get_task_instance_disambiguates_by_dag_id_and_run_id) that checks that TIs with differentdag_idorrun_idcan be fetched without exceptions.Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.