Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def waitable_handle(self):
class DagParsingStat(NamedTuple):
"""Information on processing progress"""

file_paths: List[str]
done: bool
all_files_processed: bool

Expand Down Expand Up @@ -515,6 +514,15 @@ def __init__(
self._async_mode = async_mode
self._parsing_start_time: Optional[int] = None

# Set the signal conn in to non-blocking mode, so that attempting to
# send when the buffer is full errors, rather than hangs for-ever
# attempting to send (this is to avoid deadlocks!)
#
# Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
# continue the scheduler
if self._async_mode:
os.set_blocking(self._signal_conn.fileno(), False)

self._parallelism = conf.getint('scheduler', 'parsing_processes')
if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
self.log.warning(
Expand Down Expand Up @@ -623,6 +631,7 @@ def _run_parsing_loop(self):
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
if self._signal_conn in ready:
agent_signal = self._signal_conn.recv()

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
self.terminate()
Expand Down Expand Up @@ -695,12 +704,21 @@ def _run_parsing_loop(self):
all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths)
max_runs_reached = self.max_runs_reached()

dag_parsing_stat = DagParsingStat(
self._file_paths,
max_runs_reached,
all_files_processed,
)
self._signal_conn.send(dag_parsing_stat)
try:
self._signal_conn.send(
DagParsingStat(
max_runs_reached,
all_files_processed,
)
)
except BlockingIOError:
Copy link
Member

@XD-DENG XD-DENG Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any possibility of other error type that we may want to ignore? Again, may be a dumb question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, no. Anything else is an "error" condition, and we want to die (and the scheduler will notice and restart the manager process)

# Try again next time around the loop!

# It is better to fail, than it is deadlock. This should
# "almost never happen" since the DagParsingStat object is
# small, and in async mode this stat is not actually _required_
# for normal operation (It only drives "max runs")
self.log.debug("BlockingIOError recived trying to send DagParsingStat, ignoring")

if max_runs_reached:
self.log.info(
Expand Down
4 changes: 3 additions & 1 deletion pylintrc-tests
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ good-names=e,
i,
j,
k,
n,
v, # Commonly used when iterating dict.items()
_,
ti, # Commonly used in Airflow as shorthand for taskinstance
Expand All @@ -454,7 +455,8 @@ good-names=e,
cm, # Commonly used as shorthand for context manager
ds, # Used in Airflow templates
ts, # Used in Airflow templates
id # Commonly used as shorthand for identifier
id, # Commonly used as shorthand for identifier
fd, # aka "file-descriptor" -- common in socket code

# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no
Expand Down
80 changes: 78 additions & 2 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# specific language governing permissions and limitations
# under the License.

import logging
import multiprocessing
import os
import pathlib
import random
import socket
import sys
import threading
import unittest
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
Expand All @@ -36,7 +39,7 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.utils import timezone
from airflow.utils.callback_requests import TaskCallbackRequest
from airflow.utils.callback_requests import CallbackRequest, TaskCallbackRequest
from airflow.utils.dag_processing import (
DagFileProcessorAgent,
DagFileProcessorManager,
Expand Down Expand Up @@ -521,17 +524,90 @@ def test_dag_with_system_exit(self):

manager._run_parsing_loop()

result = None
while parent_pipe.poll(timeout=None):
result = parent_pipe.recv()
if isinstance(result, DagParsingStat) and result.done:
break

# Three files in folder should be processed
assert len(result.file_paths) == 3
assert sum(stat.run_count for stat in manager._file_stats.values()) == 3

with create_session() as session:
assert session.query(DagModel).get(dag_id) is not None

@conf_vars({('core', 'load_examples'): 'False'})
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.execution_timeout(30)
def test_pipe_full_deadlock(self):
dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"

child_pipe, parent_pipe = multiprocessing.Pipe()

# Shrink the buffers to exacerbate the problem!
for fd in (parent_pipe.fileno(),):
sock = socket.socket(fileno=fd)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
sock.detach()

exit_event = threading.Event()

# To test this behaviour we need something that continually fills the
# parent pipe's buffer (and keeps it full).
def keep_pipe_full(pipe, exit_event):
n = 0
while True:
if exit_event.is_set():
break

req = CallbackRequest(str(dag_filepath))
try:
logging.debug("Sending CallbackRequests %d", n + 1)
pipe.send(req)
except TypeError:
# This is actually the error you get when the parent pipe
# is closed! Nicely handled, eh?
break
except OSError:
break
n += 1
logging.debug(" Sent %d CallbackRequests", n)

thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))

fake_processors = []

def fake_processor_factory(*args, **kwargs):
nonlocal fake_processors
processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
fake_processors.append(processor)
return processor

manager = DagFileProcessorManager(
dag_directory=dag_filepath,
dag_ids=[],
# A reasonable large number to ensure that we trigger the deadlock
max_runs=100,
processor_factory=fake_processor_factory,
processor_timeout=timedelta(seconds=5),
signal_conn=child_pipe,
pickle_dags=False,
async_mode=True,
)

try:
thread.start()

# If this completes without hanging, then the test is good!
manager._run_parsing_loop()
exit_event.set()
finally:
logging.info("Closing pipes")
parent_pipe.close()
child_pipe.close()
thread.join(timeout=1.0)


class TestDagFileProcessorAgent(unittest.TestCase):
def setUp(self):
Expand Down