Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class DagFileStat:
last_duration: float | None = None
run_count: int = 0
last_num_of_db_queries: int = 0
last_mtime: float | None = None


@dataclass(frozen=True)
Expand Down Expand Up @@ -989,8 +990,28 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):

def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and self._file_queue:
files, _ = self._sort_by_mtime(self._file_queue)
self._file_queue = deque(files)
files_with_mtime: dict[DagFileInfo, float] = {}
mtime_changed = False

for file in list(self._file_queue):
try:
mtime = os.path.getmtime(file.absolute_path)
files_with_mtime[file] = mtime
stat = self._file_stats[file] # Creates entry via defaultdict if missing
if stat.last_mtime != mtime:
mtime_changed = True
stat.last_mtime = mtime
except FileNotFoundError:
self.log.warning("Skipping processing of missing file: %s", file)
self._file_stats.pop(file, None)
mtime_changed = True # Queue structure changed

if not mtime_changed:
return # No changes, skip sorting

# Sort by mtime descending and rebuild queue
sorted_files = [f for f, _ in sorted(files_with_mtime.items(), key=itemgetter(1), reverse=True)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this put new files at the end of the list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. This is replicating what _resort_by_mtime does but optimizing by avoiding unnecessory resorting.

New files would have most recent mtimes which is higher thus processed first since it's by descending order. Older ones will be done last

self._file_queue = deque(sorted_files)

def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
files_with_mtime: dict[DagFileInfo, float] = {}
Expand Down
33 changes: 32 additions & 1 deletion airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def test_add_new_files_to_queue_behavior(self):
assert list(manager._file_queue) == [file_2, file_1]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
@mock.patch("airflow.dag_processing.manager.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_by_mtime(self):
"""
Check that existing files in the queue are re-sorted by mtime when calling _resort_file_queue,
Expand All @@ -416,6 +416,37 @@ def test_resort_file_queue_by_mtime(self):
# Verify resort happened: [file_2 (200), file_1 (100)]
assert list(manager._file_queue) == [dag_files[1], dag_files[0]]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.dag_processing.manager.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_skips_sort_when_mtimes_unchanged(self):
# Prepare some files with mtimes
files_with_mtime = [
("file_1.py", 100.0),
("file_2.py", 200.0),
]
filenames = encode_mtime_in_filename(files_with_mtime)
dag_files = _get_file_infos(filenames)

manager = DagFileProcessorManager(max_runs=1)

# Populate queue and resort
manager._file_queue = deque([dag_files[0], dag_files[1]])
manager._resort_file_queue()

# Verify initial sort happened
assert list(manager._file_queue) == [dag_files[1], dag_files[0]]

# Store reference to the queue object
original_queue = manager._file_queue

# Call resort again with same mtimes
manager._resort_file_queue()

# Queue should be the exact same object (not recreated) since mtimes didn't change
assert manager._file_queue is original_queue
# And contents should still be correct
assert list(manager._file_queue) == [dag_files[1], dag_files[0]]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
def test_resort_file_queue_does_nothing_when_alphabetical(self):
"""
Expand Down
Loading