Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,21 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):

def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and self._file_queue:
files, _ = self._sort_by_mtime(self._file_queue)
self._file_queue = deque(files)
# Separate files with pending callbacks from regular files
# Callbacks should stay at the front regardless of mtime
callback_files = []
regular_files = []
for file in self._file_queue:
if file in self._callback_to_execute:
callback_files.append(file)
else:
regular_files.append(file)

# Sort only the regular files by mtime
sorted_regular_files, _ = self._sort_by_mtime(regular_files)

# Put callback files at the front, then sorted regular files
self._file_queue = deque(callback_files + sorted_regular_files)

def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
files_with_mtime: dict[DagFileInfo, float] = {}
Expand Down
36 changes: 36 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,42 @@ def test_resort_file_queue_does_nothing_when_alphabetical(self):
# Order should remain unchanged
assert list(manager._file_queue) == [file_b, file_a]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_keeps_callbacks_at_front(self):
"""
Check that files with pending callbacks stay at the front of the queue
regardless of their modification time, and preserve their relative order.
"""
files_with_mtime = [
("callback_1.py", 50.0), # has callback, oldest mtime
("callback_2.py", 300.0), # has callback, newest mtime
("regular_1.py", 100.0), # no callback
("regular_2.py", 200.0), # no callback
]
filenames = encode_mtime_in_filename(files_with_mtime)
dag_files = _get_file_infos(filenames)
# dag_files[0] -> callback_1 (mtime 50)
# dag_files[1] -> callback_2 (mtime 300)
# dag_files[2] -> regular_1 (mtime 100)
# dag_files[3] -> regular_2 (mtime 200)

manager = DagFileProcessorManager(max_runs=1)

# Queue order: callback_1, callback_2, regular_1, regular_2
manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2], dag_files[3]])

# Both callback files have pending callbacks
manager._callback_to_execute[dag_files[0]] = [MagicMock()]
manager._callback_to_execute[dag_files[1]] = [MagicMock()]

manager._resort_file_queue()

# Callback files should stay at front in original order (callback_1, callback_2)
# despite callback_1 having the oldest mtime and callback_2 having the newest
# Regular files should be sorted by mtime (newest first): regular_2 (200), regular_1 (100)
assert list(manager._file_queue) == [dag_files[0], dag_files[1], dag_files[3], dag_files[2]]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):
Expand Down