Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,11 @@ def failure_callable():
task_id="create_file_after_3_seconds", bash_command="sleep 3; touch /tmp/temporary_file_for_testing"
)

# [START example_python_sensors]
t8 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)

t9 = PythonSensor(
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)
# [END example_python_sensors]

# [START example_day_of_week_sensor]
t10 = DayOfWeekSensor(
Expand Down
39 changes: 32 additions & 7 deletions docs/apache-airflow/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,36 @@ Jinja templating can be used in same way as described for the PythonOperator.
PythonSensor
============

Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary callable for sensing. The callable
should return True when it succeeds, False otherwise.
A PythonSensor waits for a certain condition to be ``True``, for example to wait for a file to exist. The
PythonSensor is available via ``@task.sensor`` and ``airflow.sensors.python.PythonSensor``. The callable
should return a boolean ``True`` or ``False``, indicating whether a condition is met. For example:

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_python_sensors]
:end-before: [END example_python_sensors]
.. code-block:: python

import datetime

from airflow.decorators import dag, task
from airflow.sensors.python import PythonSensor


@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
def example():

# TaskFlow sensor
@task.sensor
def wait_for_success_taskflow():
return datetime.datetime.now().minute % 2 == 0

wait_for_success_taskflow()

# Equivalent functionality using PythonSensor class
def wait_for_success_pythonsensor():
return datetime.datetime.now().minute % 2 == 0

PythonSensor(task_id="wait_for_success_pythonsensor", python_callable=wait_for_success_pythonsensor)


example()

This code sample will give two tasks waiting for the current minute to be even; (1) TaskFlow API sensor using
``@task.sensor`` and (2) classic PythonSensor referring to the callable.