Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions airflow-core/docs/core-concepts/executor/local.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,29 @@
Local Executor
==============

:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by spawning processes in a controlled fashion in different modes.
:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by spawning processes in a controlled fashion on the scheduler node.

Given that BaseExecutor has the option to receive a ``parallelism`` parameter to limit the number of process spawned,
when this parameter is ``0`` the number of processes that LocalExecutor can spawn is unlimited.
The parameter ``parallelism`` limits the number of process spawned not to overwhelm the node.
This parameter must be greater than ``0``.

The following strategies are implemented:

- | **Unlimited Parallelism** (``self.parallelism == 0``): In this strategy, LocalExecutor will
| spawn a process every time ``execute_async`` is called, that is, every task submitted to the
| :class:`~airflow.executors.local_executor.LocalExecutor` will be executed in its own process. Once the task is executed and the
| result stored in the ``result_queue``, the process terminates. There is no need for a
| ``task_queue`` in this approach, since as soon as a task is received a new process will be
| allocated to the task. Processes used in this strategy are of class :class:`~airflow.executors.local_executor.LocalWorker`.

- | **Limited Parallelism** (``self.parallelism > 0``): In this strategy, the :class:`~airflow.executors.local_executor.LocalExecutor` spawns
| the number of processes equal to the value of ``self.parallelism`` at ``start`` time,
| using a ``task_queue`` to coordinate the ingestion of tasks and the work distribution among
| the workers, which will take a task as soon as they are ready. During the lifecycle of
| the LocalExecutor, the worker processes are running waiting for tasks, once the
| LocalExecutor receives the call to shutdown the executor a poison token is sent to the
| workers to terminate them. Processes used in this strategy are of class :class:`~airflow.executors.local_executor.QueuedLocalWorker`.
The :class:`~airflow.executors.local_executor.LocalExecutor` spawns the number of processes equal to the value of ``self.parallelism`` at
``start`` time, using a ``task_queue`` to coordinate the ingestion of tasks and the work distribution among the workers, which will take
a task as soon as they are ready. During the lifecycle of the LocalExecutor, the worker processes are running waiting for tasks, once the
LocalExecutor receives the call to shutdown the executor a poison token is sent to the workers to terminate them. Processes used in this
strategy are of class :class:`~airflow.executors.local_executor.QueuedLocalWorker`.

.. note::

When multiple Schedulers are configured with ``executor = LocalExecutor`` in the ``[core]`` section of your ``airflow.cfg``, each Scheduler will run a LocalExecutor. This means tasks would be processed in a distributed fashion across the machines running the Schedulers.
When multiple Schedulers are configured with ``executor=LocalExecutor`` in the ``[core]`` section of your ``airflow.cfg``, each
Scheduler will run a LocalExecutor. This means tasks would be processed in a distributed fashion across the machines running the
Schedulers.

One consideration should be taken into account:

- Restarting a Scheduler: If a Scheduler is restarted, it may take some time for other Schedulers to recognize the orphaned tasks and restart or fail them.
- Restarting a Scheduler: If a Scheduler is restarted, it may take some time for other Schedulers to recognize the orphaned tasks
and restart or fail them.

.. note::

Previous versions of Airflow had the option to configure the LocalExecutor with unlimited parallelism
(``self.parallelism = 0``). This option has been removed in Airflow 3.0.0 to avoid overwhelming the scheduler node.
11 changes: 3 additions & 8 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from typing import TYPE_CHECKING

from airflow.executors import workloads
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import TaskInstanceState

# add logger to parameter of setproctitle to support logging
Expand Down Expand Up @@ -138,7 +138,7 @@ class LocalExecutor(BaseExecutor):

It uses the multiprocessing Python library and queues to parallelize the execution of tasks.

:param parallelism: how many parallel processes are run in the executor
:param parallelism: how many parallel processes are run in the executor, must be > 0
"""

is_local: bool = True
Expand All @@ -150,11 +150,6 @@ class LocalExecutor(BaseExecutor):
workers: dict[int, multiprocessing.Process]
_unread_messages: multiprocessing.sharedctypes.Synchronized[int]

def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
if self.parallelism < 0:
raise ValueError("parallelism must be greater than or equal to 0")

def start(self) -> None:
"""Start the executor."""
# We delay opening these queues until the start method mostly for unit tests. ExecutorLoader caches
Expand Down Expand Up @@ -190,7 +185,7 @@ def _check_workers(self):
# If we're using spawn in multiprocessing (default on macOS now) to start tasks, this can get called a
# via `sync()` a few times before the spawned process actually starts picking up messages. Try not to
# create too much
if num_outstanding and (self.parallelism == 0 or len(self.workers) < self.parallelism):
if num_outstanding and len(self.workers) < self.parallelism:
# This only creates one worker, which is fine as we call this directly after putting a message on
# activity_queue in execute_async
self._spawn_worker()
Expand Down
15 changes: 4 additions & 11 deletions airflow-core/tests/unit/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ def test_is_local_default_value(self):
def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs

@skip_spawn_mp_start
@mock.patch("airflow.sdk.execution_time.supervisor.supervise")
def _test_execute(self, mock_supervise, parallelism=1):
def test_execution(self, mock_supervise):
success_tis = [
workloads.TaskInstance(
id=uuid7(),
Expand Down Expand Up @@ -84,7 +85,7 @@ def fake_supervise(ti, **kwargs):

mock_supervise.side_effect = fake_supervise

executor = LocalExecutor(parallelism=parallelism)
executor = LocalExecutor(parallelism=2)
executor.start()

assert executor.result_queue.empty()
Expand Down Expand Up @@ -118,7 +119,7 @@ def fake_supervise(ti, **kwargs):

executor.end()

expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
expected = 2
# Depending on how quickly the tasks run, we might not need to create all the workers we could
assert 1 <= len(spawn_worker.calls) <= expected

Expand All @@ -130,14 +131,6 @@ def fake_supervise(ti, **kwargs):
assert executor.event_buffer[ti.key][0] == State.SUCCESS
assert executor.event_buffer[fail_ti.key][0] == State.FAILED

@skip_spawn_mp_start
@pytest.mark.parametrize(
("parallelism",),
[pytest.param(2, id="limited")],
)
def test_execution(self, parallelism: int):
self._test_execute(parallelism=parallelism)

@mock.patch("airflow.executors.local_executor.LocalExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,6 @@ def test_no_spawn_if_parallelism_reached(setup_executor):
executor._spawn_worker.assert_not_called()


def test_parallelism_zero_spawns_worker(setup_executor):
executor = setup_executor
executor.parallelism = 0
executor._unread_messages.value = 1
executor.activity_queue.empty.return_value = False
executor.workers = {}
executor._check_workers()
executor._spawn_worker.assert_called_once()


def test_spawn_worker_when_we_have_parallelism_left(setup_executor):
executor = setup_executor
# Simulate 4 running workers
Expand Down