-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Make dag_version_id in TI non-nullable #46703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| from airflow.listeners.listener import get_listener_manager | ||
| from airflow.models import TaskInstance | ||
| from airflow.models.dag import DAG, _run_inline_trigger | ||
| from airflow.models.dag_version import DagVersion | ||
| from airflow.models.dagrun import DagRun | ||
| from airflow.models.taskinstance import TaskReturnCode | ||
| from airflow.sdk.definitions.param import ParamsDict | ||
|
|
@@ -211,7 +212,10 @@ def _get_ti( | |
| f"run_id or logical_date of {logical_date_or_run_id!r} not found" | ||
| ) | ||
| # TODO: Validate map_index is in range? | ||
| ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index) | ||
| dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) | ||
| if TYPE_CHECKING: | ||
| assert dag_version | ||
| ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index, dag_version_id=dag_version.id) | ||
|
Comment on lines
+215
to
+218
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this pattern is used so often, maybe it’s better for
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or, maybe TI init can handle this for folks?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jedcunningham, do you mean, if it's not provided, use the latest dag version ID in init? The only thing is that session is not provided in init. |
||
| if dag_run in session: | ||
| session.add(ti) | ||
| ti.dag_run = dag_run | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,7 +141,7 @@ def upgrade(): | |
| batch_op.add_column(sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow)) | ||
|
|
||
| with op.batch_alter_table("task_instance", schema=None) as batch_op: | ||
| batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) | ||
| batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens to old tis created before this column is added? I think we need some data migration work here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. Yeah, quite needed |
||
| batch_op.create_foreign_key( | ||
| batch_op.f("task_instance_dag_version_id_fkey"), | ||
| "dag_version", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 617116c74735faa69a297fe665664b691f341487bc8dcbef3e3ec7e76cdea799 | ||
| 9cc3230fc60a08ab5be6779b9fec70089bf2f9eeeb120a4f746628e2e7582989 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
|
|
||
| import os | ||
| import sys | ||
| import uuid | ||
| from collections.abc import Iterable, Mapping | ||
| from datetime import datetime, timezone | ||
| from io import FileIO | ||
|
|
@@ -92,6 +93,7 @@ class RuntimeTaskInstance(TaskInstance): | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | ||
|
|
||
| task: BaseOperator | ||
| dag_version_id: uuid.UUID | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same with these in task_sdk? |
||
| _ti_context_from_server: Annotated[TIRunContext | None, Field(repr=False)] = None | ||
| """The Task Instance context from the API server, if any.""" | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this during runtime though? Not seeing where it is used.