Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ repos:
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
^airflow-core/tests/unit/models/test_xcom.py$|
^airflow-core/tests/unit/utils/test_db_cleanup.py$|
^dev/airflow_perf/scheduler_dag_execution_timing.py$|
^providers/openlineage/.*\.py$|
Expand Down
15 changes: 8 additions & 7 deletions airflow-core/tests/unit/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from unittest.mock import MagicMock

import pytest
from sqlalchemy import delete, func, select

from airflow._shared.timezones import timezone
from airflow.configuration import conf
Expand Down Expand Up @@ -88,7 +89,7 @@ def func(*, dag_id, task_id, logical_date, run_after=None):

def cleanup_database():
# This should also clear task instances by cascading.
session.query(DagRun).filter_by(id=run.id).delete()
session.execute(delete(DagRun).where(DagRun.id == run.id))
session.commit()

request.addfinalizer(cleanup_database)
Expand Down Expand Up @@ -384,7 +385,7 @@ def test_xcom_set(self, session, task_instance, key, value, expected_value):
run_id=task_instance.run_id,
session=session,
)
stored_xcoms = session.query(XComModel).all()
stored_xcoms = session.scalars(select(XComModel)).all()
assert stored_xcoms[0].key == key
assert isinstance(stored_xcoms[0].value, type(json.dumps(expected_value)))
assert stored_xcoms[0].value == json.dumps(expected_value)
Expand All @@ -398,7 +399,7 @@ def setup_for_xcom_set_again_replace(self, task_instance, push_simple_json_xcom)

@pytest.mark.usefixtures("setup_for_xcom_set_again_replace")
def test_xcom_set_again_replace(self, session, task_instance):
assert session.query(XComModel).one().value == json.dumps({"key1": "value1"})
assert session.scalar(select(XComModel)).value == json.dumps({"key1": "value1"})
XComModel.set(
key="xcom_1",
value={"key2": "value2"},
Expand All @@ -407,7 +408,7 @@ def test_xcom_set_again_replace(self, session, task_instance):
run_id=task_instance.run_id,
session=session,
)
assert session.query(XComModel).one().value == json.dumps({"key2": "value2"})
assert session.scalar(select(XComModel)).value == json.dumps({"key2": "value2"})

def test_xcom_set_invalid_key(self, session, task_instance):
"""Test that setting an XCom with an invalid key raises a ValueError."""
Expand Down Expand Up @@ -440,14 +441,14 @@ def setup_for_xcom_clear(self, task_instance, push_simple_json_xcom):
@pytest.mark.usefixtures("setup_for_xcom_clear")
@mock.patch("airflow.models.xcom.XCom.purge")
def test_xcom_clear(self, mock_purge, session, task_instance):
assert session.query(XComModel).count() == 1
assert session.scalar(select(func.count()).select_from(XComModel)) == 1
XComModel.clear(
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
run_id=task_instance.run_id,
session=session,
)
assert session.query(XComModel).count() == 0
assert session.scalar(select(func.count()).select_from(XComModel)) == 0
# purge will not be done when we clear, will be handled in task sdk
assert mock_purge.call_count == 0

Expand All @@ -459,7 +460,7 @@ def test_xcom_clear_different_run(self, session, task_instance):
run_id="different_run",
session=session,
)
assert session.query(XComModel).count() == 1
assert session.scalar(select(func.count()).select_from(XComModel)) == 1


class TestXComRoundTrip:
Expand Down