Skip to content

Commit cfec563

Browse files
committed
Avoid scheduler/parser manager deadlock by using non-blocking IO
There have been long standing issues where the scheduler would "stop responding" that we haven't been able to track down. Someone was able to catch the scheduler in this state in 2.0.1 and inspect it with py-spy (thanks, MatthewRBruce!) The stack traces (slightly shortened) were: ``` Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:411) send (multiprocessing/connection.py:206) send_callback_to_execute (airflow/utils/dag_processing.py:283) _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795) _schedule_dag_run (airflow/jobs/scheduler_job.py:1762) Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:405) send (multiprocessing/connection.py:206) _run_parsing_loop (airflow/utils/dag_processing.py:698) start (airflow/utils/dag_processing.py:596) ``` What this shows is that both processes are stuck trying to send data to each other, but neither can proceed as both buffers are full, but since both are trying to send, neither side is going to read and make more space in the buffer. A classic deadlock! The fix for this is two fold: 1) Enable non-blocking IO on the DagFileProcessorManager side. The only thing the Manager sends back up the pipe is (now, as of 2.0) the DagParsingStat object, and the scheduler will happily continue without receiving these, so in the case of a blocking error, it is simply better to ignore the error, continue the loop and try sending one again later. 2) Reduce the size of DagParsingStat In the case of a large number of dag files we included the path for each and every one (in full) in _each_ parsing stat. Not only did the scheduler do nothing with this field, meaning it was larger than it needed to be, by making it such a large object, it increases the likely hood of hitting this send-buffer-full deadlock case!
1 parent 6e99ae0 commit cfec563

File tree

3 files changed

+107
-6
lines changed

3 files changed

+107
-6
lines changed

airflow/utils/dag_processing.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def waitable_handle(self):
141141
class DagParsingStat(NamedTuple):
142142
"""Information on processing progress"""
143143

144-
file_paths: List[str]
144+
num_file_paths: int
145145
done: bool
146146
all_files_processed: bool
147147

@@ -515,6 +515,15 @@ def __init__(
515515
self._async_mode = async_mode
516516
self._parsing_start_time: Optional[int] = None
517517

518+
# Set the signal conn in to non-blocking mode, so that attempting to
519+
# send when the buffer is full errors, rather than hangs for-ever
520+
# attempting to send (this is to avoid deadlocks!)
521+
#
522+
# Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
523+
# continue the scheduler
524+
if self._async_mode:
525+
os.set_blocking(self._signal_conn.fileno(), False)
526+
518527
self._parallelism = conf.getint('scheduler', 'parsing_processes')
519528
if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
520529
self.log.warning(
@@ -623,6 +632,7 @@ def _run_parsing_loop(self):
623632
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
624633
if self._signal_conn in ready:
625634
agent_signal = self._signal_conn.recv()
635+
626636
self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
627637
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
628638
self.terminate()
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
696706
max_runs_reached = self.max_runs_reached()
697707

698708
dag_parsing_stat = DagParsingStat(
699-
self._file_paths,
709+
len(self._file_paths),
700710
max_runs_reached,
701711
all_files_processed,
702712
)
703-
self._signal_conn.send(dag_parsing_stat)
713+
try:
714+
self._signal_conn.send(dag_parsing_stat)
715+
except BlockingIOError:
716+
# Try again next time around the loop!
717+
718+
# It is better to fail, than it is deadlock. This should
719+
# "almost never happen" since the DagParsingStat object is
720+
# small, and in async mode this stat is not actually _required_
721+
# for normal operation (It only drives "max runs")
722+
self.log.debug("BlockingIOError recived trying to send DagParsingStat, ignoring")
704723

705724
if max_runs_reached:
706725
self.log.info(

pylintrc-tests

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ good-names=e,
443443
i,
444444
j,
445445
k,
446+
n,
446447
v, # Commonly used when iterating dict.items()
447448
_,
448449
ti, # Commonly used in Airflow as shorthand for taskinstance
@@ -454,7 +455,8 @@ good-names=e,
454455
cm, # Commonly used as shorthand for context manager
455456
ds, # Used in Airflow templates
456457
ts, # Used in Airflow templates
457-
id # Commonly used as shorthand for identifier
458+
id, # Commonly used as shorthand for identifier
459+
fd, # aka "file-descriptor" -- common in socket code
458460

459461
# Include a hint for the correct naming format with invalid-name.
460462
include-naming-hint=no

tests/utils/test_dag_processing.py

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from airflow.models.serialized_dag import SerializedDagModel
3737
from airflow.models.taskinstance import SimpleTaskInstance
3838
from airflow.utils import timezone
39-
from airflow.utils.callback_requests import TaskCallbackRequest
39+
from airflow.utils.callback_requests import CallbackRequest, TaskCallbackRequest
4040
from airflow.utils.dag_processing import (
4141
DagFileProcessorAgent,
4242
DagFileProcessorManager,
@@ -521,17 +521,97 @@ def test_dag_with_system_exit(self):
521521

522522
manager._run_parsing_loop()
523523

524+
result = None
524525
while parent_pipe.poll(timeout=None):
525526
result = parent_pipe.recv()
526527
if isinstance(result, DagParsingStat) and result.done:
527528
break
528529

529530
# Three files in folder should be processed
530-
assert len(result.file_paths) == 3
531+
assert result.num_file_paths == 3
531532

532533
with create_session() as session:
533534
assert session.query(DagModel).get(dag_id) is not None
534535

536+
@conf_vars({('core', 'load_examples'): 'False'})
537+
@pytest.mark.backend("mysql", "postgres")
538+
def test_pipe_full_deadlock(self):
539+
import threading
540+
541+
dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
542+
543+
child_pipe, parent_pipe = multiprocessing.Pipe()
544+
545+
import socket
546+
547+
# Shrink the buffers to exacerbate the problem!
548+
for fd in (parent_pipe.fileno(),):
549+
sock = socket.socket(fileno=fd)
550+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
551+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
552+
sock.detach()
553+
554+
exit_event = threading.Event()
555+
556+
# To test this behaviour we need something that continually fills the
557+
# parent pipe's bufffer (and keeps it full).
558+
def keep_pipe_full(pipe, exit_event):
559+
import logging
560+
561+
n = 0
562+
while True:
563+
if exit_event.is_set():
564+
break
565+
566+
req = CallbackRequest(str(dag_filepath))
567+
try:
568+
logging.debug("Sending CallbackRequests %d", n + 1)
569+
pipe.send(req)
570+
except TypeError:
571+
# This is actually the error you get when the parent pipe
572+
# is closed! Nicely handled, eh?
573+
break
574+
except OSError:
575+
break
576+
n += 1
577+
logging.debug(" Sent %d CallbackRequests", n)
578+
579+
thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
580+
581+
fake_processors = []
582+
583+
def fake_processor_factory(*args, **kwargs):
584+
nonlocal fake_processors
585+
processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
586+
fake_processors.append(processor)
587+
return processor
588+
589+
manager = DagFileProcessorManager(
590+
dag_directory=dag_filepath,
591+
dag_ids=[],
592+
# A reasonable large number to ensure that we trigger the deadlock
593+
max_runs=100,
594+
processor_factory=fake_processor_factory,
595+
processor_timeout=timedelta(seconds=5),
596+
signal_conn=child_pipe,
597+
pickle_dags=False,
598+
async_mode=True,
599+
)
600+
601+
try:
602+
thread.start()
603+
604+
# If this completes without hanging, then the test is good!
605+
manager._run_parsing_loop()
606+
exit_event.set()
607+
finally:
608+
import logging
609+
610+
logging.info("Closing pipes")
611+
parent_pipe.close()
612+
child_pipe.close()
613+
thread.join(timeout=1.0)
614+
535615

536616
class TestDagFileProcessorAgent(unittest.TestCase):
537617
def setUp(self):

0 commit comments

Comments
 (0)