Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ def _execute_callable(context: Context, **execute_callable_kwargs):


def _set_ti_attrs(target, source, include_dag_run=False):
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic

# Fields ordered per model definition
target.start_date = source.start_date
target.end_date = source.end_date
Expand All @@ -826,6 +828,8 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.trigger_id = source.trigger_id
target.next_method = source.next_method
target.next_kwargs = source.next_kwargs
if source.note and isinstance(source, TaskInstancePydantic):
target.note = source.note

if include_dag_run:
target.execution_date = source.execution_date
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
dag_model: Optional[DagModelPydantic]
raw: Optional[bool]
is_trigger_log_context: Optional[bool]
note: Optional[str] = None
model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)

@property
Expand Down
65 changes: 65 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.python import PythonSensor
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
from airflow.settings import TIMEZONE, TracebackSessionForTests, reconfigure_orm
from airflow.stats import Stats
Expand Down Expand Up @@ -5421,6 +5422,70 @@ def test_taskinstance_with_note(create_task_instance, session):
assert session.query(TaskInstanceNote).filter_by(**filter_kwargs).one_or_none() is None


def test_taskinstance_with_note_pydantic(create_task_instance, session):
ti = create_task_instance(
dag_id="dag_for_testing_with_note_pydantic",
task_id="task_for_testing_with_note_pydantic",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)

ti_pydantic = TaskInstancePydantic(
task_id=ti.task_id,
dag_id=ti.dag_id,
run_id=ti.run_id,
map_index=ti.map_index,
start_date=ti.start_date,
end_date=ti.end_date,
execution_date=ti.execution_date,
duration=0.1,
state="success",
try_number=ti.try_number,
max_tries=ti.max_tries,
hostname="host",
unixname="unix",
job_id=ti.job_id,
pool=ti.pool,
pool_slots=ti.pool_slots,
queue=ti.queue,
priority_weight=ti.priority_weight,
operator=ti.operator,
custom_operator_name=ti.custom_operator_name,
queued_dttm=timezone.utcnow(),
queued_by_job_id=3,
pid=12345,
executor=ti.executor,
executor_config=None,
updated_at=timezone.utcnow(),
rendered_map_index=ti.rendered_map_index,
external_executor_id="x",
trigger_id=ti.trigger_id,
trigger_timeout=timezone.utcnow(),
next_method="bla",
next_kwargs=None,
run_as_user=None,
task=ti.task,
test_mode=False,
dag_run=ti.dag_run,
dag_model=ti.dag_model,
raw=False,
is_trigger_log_context=False,
note="ti with note",
)

TaskInstance.save_to_db(ti_pydantic, session)

filter_kwargs = dict(
dag_id=ti_pydantic.dag_id,
task_id=ti_pydantic.task_id,
run_id=ti_pydantic.run_id,
map_index=ti_pydantic.map_index,
)

ti_note: TaskInstanceNote = session.query(TaskInstanceNote).filter_by(**filter_kwargs).one()
assert ti_note.content == "ti with note"


def test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
with dag_maker():
BashOperator(task_id="hello", bash_command="hi")
Expand Down
Loading