Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions tests/sensors/test_date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from unittest.mock import patch

import pytest
from parameterized import parameterized

from airflow.models.dag import DAG
from airflow.sensors.date_time import DateTimeSensor
Expand All @@ -35,7 +34,8 @@ def setup_class(cls):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
cls.dag = DAG("test_dag", default_args=args)

@parameterized.expand(
@pytest.mark.parametrize(
"task_id, target_time, expected",
[
(
"valid_datetime",
Expand All @@ -52,7 +52,7 @@ def setup_class(cls):
"{{ ds }}",
"{{ ds }}",
),
]
],
)
def test_valid_input(self, task_id, target_time, expected):
"""target_time should be a string as it is a template field"""
Expand All @@ -71,7 +71,8 @@ def test_invalid_input(self):
dag=self.dag,
)

@parameterized.expand(
@pytest.mark.parametrize(
"task_id, target_time, expected",
[
(
"poke_datetime",
Expand All @@ -80,12 +81,12 @@ def test_invalid_input(self):
),
("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
("poke_str_basic_with_tz", "20200102T065959+8", True),
]
],
)
@patch(
"airflow.sensors.date_time.timezone.utcnow",
return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
)
def test_poke(self, task_id, target_time, expected, mock_utcnow):
def test_poke(self, mock_utcnow, task_id, target_time, expected):
op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag)
assert op.poke(None) == expected
79 changes: 38 additions & 41 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import logging
import os
import tempfile
import unittest
import zipfile
from datetime import time, timedelta

Expand Down Expand Up @@ -88,8 +87,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
yield DagZipMaker()


class TestExternalTaskSensor(unittest.TestCase):
def setUp(self):
class TestExternalTaskSensor:
def setup_method(self):
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG(TEST_DAG_ID, default_args=self.args)
Expand Down Expand Up @@ -166,8 +165,9 @@ def test_external_task_group_not_exists_without_check_existence(self):
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
external_task_group_id="fake-task-group",
timeout=1,
timeout=0.001,
dag=self.dag,
poke_interval=0.1,
Comment on lines -169 to +170
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default poke_interval is 60 second as result test wait 1 minute for timeout exception.

)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

Expand Down Expand Up @@ -232,7 +232,7 @@ def test_external_task_sensor_failed_states(self):
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_external_task_sensor_failed_states_as_success(self):
def test_external_task_sensor_failed_states_as_success(self, caplog):
self.add_time_sensor()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
Expand All @@ -242,20 +242,16 @@ def test_external_task_sensor_failed_states_as_success(self):
failed_states=["success"],
dag=self.dag,
)
with self.assertLogs(op.log, level=logging.INFO) as cm:
with pytest.raises(AirflowException) as ctx:
error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in DAG {TEST_DAG_ID} failed\."
with pytest.raises(AirflowException, match=error_message):
with caplog.at_level(logging.INFO, logger=op.log.name):
caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
f"INFO:airflow.task.operators:Poking for tasks ['time_sensor_check'] "
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
)
assert (
str(ctx.value) == "Some of the external tasks "
"['time_sensor_check'] in DAG "
"unit_test_dag failed."
)
assert (
f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
) in caplog.messages

def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None):
def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
self.add_time_sensor()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
Expand All @@ -277,7 +273,7 @@ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=N
assert len(task_instances) == 1, "Unexpected number of task instances"
assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"

def test_external_task_sensor_external_task_id_param(self):
def test_external_task_sensor_external_task_id_param(self, caplog):
"""Test external_task_ids is set properly when external_task_id is passed as a template"""
self.add_time_sensor()
op = ExternalTaskSensor(
Expand All @@ -288,14 +284,15 @@ def test_external_task_sensor_external_task_id_param(self):
dag=self.dag,
)

with self.assertLogs(op.log, level=logging.INFO) as cm:
with caplog.at_level(logging.INFO, logger=op.log.name):
caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
)
f"Poking for tasks ['{TEST_TASK_ID}'] "
f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
) in caplog.messages

def test_external_task_sensor_external_task_ids_param(self):
def test_external_task_sensor_external_task_ids_param(self, caplog):
"""Test external_task_ids rendering when a template is passed."""
self.add_time_sensor()
op = ExternalTaskSensor(
Expand All @@ -306,14 +303,15 @@ def test_external_task_sensor_external_task_ids_param(self):
dag=self.dag,
)

with self.assertLogs(op.log, level=logging.INFO) as cm:
with caplog.at_level(logging.INFO, logger=op.log.name):
caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
)
f"Poking for tasks ['{TEST_TASK_ID}'] "
f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
) in caplog.messages

def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, caplog):
self.add_time_sensor(task_id=TEST_TASK_ID)
self.add_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
op = ExternalTaskSensor(
Expand All @@ -324,19 +322,18 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
failed_states=["success"],
dag=self.dag,
)
with self.assertLogs(op.log, level=logging.INFO) as cm:
with pytest.raises(AirflowException) as ctx:
error_message = (
rf"Some of the external tasks \['{TEST_TASK_ID}'\, \'{TEST_TASK_ID_ALTERNATE}\'] "
rf"in DAG {TEST_DAG_ID} failed\."
)
with pytest.raises(AirflowException, match=error_message):
with caplog.at_level(logging.INFO, logger=op.log.name):
caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
f"INFO:airflow.task.operators:Poking for tasks "
f"['time_sensor_check', 'time_sensor_check_alternate'] "
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
)
assert (
str(ctx.value) == "Some of the external tasks "
"['time_sensor_check', 'time_sensor_check_alternate'] in DAG "
"unit_test_dag failed."
)
assert (
f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] "
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... "
) in caplog.messages

def test_external_dag_sensor(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
Expand Down Expand Up @@ -667,7 +664,7 @@ def test_external_task_sensor_templated(dag_maker, app):
assert f"/dags/dag_{DEFAULT_DATE.date()}/grid" in url


class TestExternalTaskMarker(unittest.TestCase):
class TestExternalTaskMarker:
def test_serialized_fields(self):
assert {"recursion_depth"}.issubset(ExternalTaskMarker.get_serialized_fields())

Expand Down
6 changes: 3 additions & 3 deletions tests/sensors/test_time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import freezegun
import pendulum
import pytest
from parameterized import parameterized

from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
Expand All @@ -41,12 +40,13 @@
return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
)
class TestTimeSensor:
@parameterized.expand(
@pytest.mark.parametrize(
"default_timezone, start_date, expected",
[
("UTC", DEFAULT_DATE_WO_TZ, True),
("UTC", DEFAULT_DATE_WITH_TZ, False),
(DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
]
],
)
def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
Expand Down