Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import importlib.machinery
import importlib.util
import os
import signal
import sys
import textwrap
import traceback
Expand Down Expand Up @@ -358,6 +359,17 @@ def get_pools(dag) -> dict[str, set[str]]:
def _load_modules_from_file(self, filepath, safe_mode):
from airflow.sdk.definitions._internal.contextmanager import DagContext

def handler(signum, frame):
"""Handle SIGSEGV signal and let the user know that the import failed."""
msg = f"Received SIGSEGV signal while processing {filepath}."
self.log.error(msg)
self.import_errors[filepath] = msg

try:
signal.signal(signal.SIGSEGV, handler)
except ValueError:
self.log.warning("SIGSEGV signal handler registration failed. Not in the main thread")

if not might_contain_dag(filepath, safe_mode):
# Don't want to spam user with skip messages
if not self.has_logged:
Expand Down
53 changes: 53 additions & 0 deletions airflow-core/tests/unit/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,3 +959,56 @@ def test_dag_warnings_invalid_pool(self, known_pools, expected):
dagbag = DagBag(dag_folder="", include_examples=False, collect_dags=False, known_pools=known_pools)
dagbag.bag_dag(dag)
assert dagbag.dag_warnings == expected

def test_sigsegv_handling(self, tmp_path, caplog):
"""
Test that a SIGSEGV in a DAG file is handled gracefully and does not crash the process.
"""
# Create a DAG file that will raise a SIGSEGV
dag_file = tmp_path / "bad_dag.py"
dag_file.write_text(
textwrap.dedent(
"""\
import signal
from airflow import DAG
import os
from airflow.decorators import task

os.kill(os.getpid(), signal.SIGSEGV)

with DAG('testbug'):
@task
def mytask():
print(1)
mytask()
"""
)
)

dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "Received SIGSEGV signal while processing" in caplog.text
assert dag_file.as_posix() in dagbag.import_errors

def test_failed_signal_registration_does_not_crash_the_process(self, tmp_path, caplog):
"""Test that a ValueError raised by a signal setting on child process does not crash the main process.
This was raised in test_dag_report.py module in api_fastapi/core_api/routes/public tests
"""
dag_file = tmp_path / "test_dag.py"
dag_file.write_text(
textwrap.dedent(
"""\
from airflow import DAG
from airflow.decorators import task

with DAG('testbug'):
@task
def mytask():
print(1)
mytask()
"""
)
)
with mock.patch("airflow.models.dagbag.signal.signal") as mock_signal:
mock_signal.side_effect = ValueError("Invalid signal setting")
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the main thread" in caplog.text