Skip to content

GH-89727: Add pathlib.Path.fwalk() method #103566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Doc/library/pathlib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,53 @@ call fails (for example because the path doesn't exist).

.. versionadded:: 3.12

.. method:: Path.fwalk(top_down=True, on_error=None, follow_symlinks=False, dir_fd=None)

This behaves exactly like :meth:`~Path.walk`, except that it yields a
4-tuple ``(dirpath, dirnames, filenames, dirfd)``, and it supports ``dir_fd``.

*dirpath*, *dirnames* and *filenames* are identical to :meth:`~Path.walk`
output, and *dirfd* is a file descriptor referring to the directory
*dirpath*.

.. note::
Since :meth:`~Path.fwalk` yields file descriptors, those are only valid
until the next iteration step, so you should duplicate them (e.g. with
:func:`os.dup`) if you want to keep them longer.

This example displays the number of bytes used by all files in each directory,
while ignoring ``__pycache__`` directories::

from pathlib import Path
for root, dirs, files, rootfd in Path("cpython/Lib/concurrent").fwalk(on_error=print):
print(
root,
"consumes",
sum(os.stat(name, dir_fd=rootfd).st_size for name in files),
"bytes in",
len(files),
"non-directory files"
)
if '__pycache__' in dirs:
dirs.remove('__pycache__')

This next example is a simple implementation of :func:`shutil.rmtree`.
Walking the tree bottom-up is essential as :func:`rmdir` doesn't allow
deleting a directory before it is empty::

# Delete everything reachable from the directory "top".
# CAUTION: This is dangerous! For example, if top == Path('/'),
# it could delete all of your files.
for root, dirs, files, rootfd in top.fwalk(top_down=False):
for name in files:
os.unlink(name, dir_fd=rootfd)
for name in dirs:
os.rmdir(name, dir_fd=rootfd)

.. availability:: Unix.

.. versionadded:: 3.12

.. method:: Path.lchmod(mode)

Like :meth:`Path.chmod` but, if the path points to a symbolic link, the
Expand Down
5 changes: 3 additions & 2 deletions Doc/whatsnew/3.12.rst
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,9 @@ pathlib
Subclasses may override the :meth:`~pathlib.PurePath.with_segments` method
to pass information between path instances.

* Add :meth:`~pathlib.Path.walk` for walking the directory trees and generating
all file or directory names within them, similar to :func:`os.walk`.
* Add :meth:`~pathlib.Path.walk` and :meth:`~pathlib.Path.fwalk`
for walking the directory trees and generating all file or directory names
within them, similar to :func:`os.walk` and :func:`~os.fwalk`.
(Contributed by Stanislav Zmiev in :gh:`90385`.)

* Add *walk_up* optional parameter to :meth:`pathlib.PurePath.relative_to`
Expand Down
128 changes: 87 additions & 41 deletions Lib/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,58 @@ def _select_from(self, parent_path, scandir):
yielded.clear()


class _WalkAction:
WALK = object()
YIELD = object()
CLOSE = object()


def _walk(top_down, on_error, follow_symlinks, use_fd, actions):
while actions:
action, value = actions.pop()
try:
if action is _WalkAction.WALK:
path, dir_fd, entry = value
dirnames = []
filenames = []
if use_fd:
scandir, fd = path._scandir_fwalk(
follow_symlinks, actions, dir_fd, entry)
result = path, dirnames, filenames, fd
else:
scandir, fd = path._scandir, None
result = path, dirnames, filenames
with scandir() as scandir_it:
if not top_down:
actions.append((_WalkAction.YIELD, result))
for entry in scandir_it:
try:
if entry.is_dir(follow_symlinks=follow_symlinks):
if not top_down:
actions.append((_WalkAction.WALK, (
path._make_child_relpath(entry.name), fd,
entry if use_fd and not follow_symlinks else None)))
dirnames.append(entry.name)
else:
filenames.append(entry.name)
except OSError:
filenames.append(entry.name)
if top_down:
yield result
for dirname in reversed(dirnames):
actions.append((_WalkAction.WALK, (
path._make_child_relpath(dirname), fd, None)))
elif action is _WalkAction.YIELD:
yield value
elif action is _WalkAction.CLOSE:
os.close(value)
else:
raise AssertionError(f"unknown walk action: {action}")
except OSError as error:
if on_error is not None:
on_error(error)


#
# Public API
#
Expand Down Expand Up @@ -1040,47 +1092,8 @@ def rglob(self, pattern, *, case_sensitive=None):
def walk(self, top_down=True, on_error=None, follow_symlinks=False):
"""Walk the directory tree from this directory, similar to os.walk()."""
sys.audit("pathlib.Path.walk", self, on_error, follow_symlinks)
paths = [self]

while paths:
path = paths.pop()
if isinstance(path, tuple):
yield path
continue

# We may not have read permission for self, in which case we can't
# get a list of the files the directory contains. os.walk()
# always suppressed the exception in that instance, rather than
# blow up for a minor reason when (say) a thousand readable
# directories are still left to visit. That logic is copied here.
try:
scandir_it = path._scandir()
except OSError as error:
if on_error is not None:
on_error(error)
continue

with scandir_it:
dirnames = []
filenames = []
for entry in scandir_it:
try:
is_dir = entry.is_dir(follow_symlinks=follow_symlinks)
except OSError:
# Carried over from os.path.isdir().
is_dir = False

if is_dir:
dirnames.append(entry.name)
else:
filenames.append(entry.name)

if top_down:
yield path, dirnames, filenames
else:
paths.append((path, dirnames, filenames))

paths += [path._make_child_relpath(d) for d in reversed(dirnames)]
actions = [(_WalkAction.WALK, (self, None, None))]
return _walk(top_down, on_error, follow_symlinks, False, actions)

def __init__(self, *args, **kwargs):
if kwargs:
Expand Down Expand Up @@ -1337,6 +1350,39 @@ def expanduser(self):

return self

if {os.stat, os.open} <= os.supports_dir_fd and {os.stat, os.scandir} <= os.supports_fd:
def fwalk(self, top_down=True, *, on_error=None, follow_symlinks=False, dir_fd=None):
"""Walk the directory tree from this directory, similar to os.fwalk()."""
sys.audit("pathlib.Path.fwalk", self, on_error, follow_symlinks, dir_fd)
actions = [(_WalkAction.WALK, (self, dir_fd, None))]
try:
return _walk(top_down, on_error, follow_symlinks, True, actions)
finally:
for action, value in reversed(actions):
if action is _WalkAction.CLOSE:
try:
os.close(value)
except OSError:
pass

def _scandir_fwalk(self, follow_symlinks, actions, dir_fd, entry):
name = self if dir_fd is None else self.name
if follow_symlinks:
fd = os.open(name, os.O_RDONLY, dir_fd=dir_fd)
actions.append((_WalkAction.CLOSE, fd))
else:
# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
if entry is None:
orig_st = os.stat(name, follow_symlinks=False, dir_fd=dir_fd)
else:
orig_st = entry.stat(follow_symlinks=False)
fd = os.open(name, os.O_RDONLY, dir_fd=dir_fd)
actions.append((_WalkAction.CLOSE, fd))
if not os.path.samestat(orig_st, os.stat(fd)):
raise NotADirectoryError("Cannot walk into a symbolic link")
return lambda: os.scandir(fd), fd


class PosixPath(Path, PurePosixPath):
"""Path subclass for non-Windows systems.
Expand Down
80 changes: 69 additions & 11 deletions Lib/test/test_pathlib.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import collections.abc
import io
import itertools
import os
import sys
import errno
Expand Down Expand Up @@ -2680,6 +2681,9 @@ def test_passing_kwargs_deprecated(self):

class WalkTests(unittest.TestCase):

def walk(self, top, **kwargs):
return top.walk(**kwargs)

def setUp(self):
self.addCleanup(os_helper.rmtree, os_helper.TESTFN)

Expand Down Expand Up @@ -2750,7 +2754,7 @@ def setUp(self):
del self.sub2_tree[1][:1]

def test_walk_topdown(self):
walker = self.walk_path.walk()
walker = self.walk(self.walk_path)
entry = next(walker)
entry[1].sort() # Ensure we visit SUB1 before SUB2
self.assertEqual(entry, (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Expand All @@ -2770,7 +2774,7 @@ def test_walk_prune(self, walk_path=None):
walk_path = self.walk_path
# Prune the search.
all = []
for root, dirs, files in walk_path.walk():
for root, dirs, files in self.walk(walk_path):
all.append((root, dirs, files))
if 'SUB1' in dirs:
# Note that this also mutates the dirs we appended to all!
Expand All @@ -2788,7 +2792,7 @@ def test_file_like_path(self):

def test_walk_bottom_up(self):
seen_testfn = seen_sub1 = seen_sub11 = seen_sub2 = False
for path, dirnames, filenames in self.walk_path.walk(top_down=False):
for path, dirnames, filenames in self.walk(self.walk_path, top_down=False):
if path == self.walk_path:
self.assertFalse(seen_testfn)
self.assertTrue(seen_sub1)
Expand Down Expand Up @@ -2821,7 +2825,7 @@ def test_walk_bottom_up(self):

@os_helper.skip_unless_symlink
def test_walk_follow_symlinks(self):
walk_it = self.walk_path.walk(follow_symlinks=True)
walk_it = self.walk(self.walk_path, follow_symlinks=True)
for root, dirs, files in walk_it:
if root == self.link_path:
self.assertEqual(dirs, [])
Expand All @@ -2834,23 +2838,23 @@ def test_walk_follow_symlinks(self):
def test_walk_symlink_location(self):
# Tests whether symlinks end up in filenames or dirnames depending
# on the `follow_symlinks` argument.
walk_it = self.walk_path.walk(follow_symlinks=False)
walk_it = self.walk(self.walk_path, follow_symlinks=False)
for root, dirs, files in walk_it:
if root == self.sub2_path:
self.assertIn("link", files)
break
else:
self.fail("symlink not found")

walk_it = self.walk_path.walk(follow_symlinks=True)
walk_it = self.walk(self.walk_path, follow_symlinks=True)
for root, dirs, files in walk_it:
if root == self.sub2_path:
self.assertIn("link", dirs)
break

def test_walk_bad_dir(self):
errors = []
walk_it = self.walk_path.walk(on_error=errors.append)
walk_it = self.walk(self.walk_path, on_error=errors.append)
root, dirs, files = next(walk_it)
self.assertEqual(errors, [])
dir1 = 'SUB1'
Expand All @@ -2874,14 +2878,14 @@ def test_walk_many_open_files(self):
path = pathlib.Path(base, *(['d']*depth))
path.mkdir(parents=True)

iters = [base.walk(top_down=False) for _ in range(100)]
iters = [self.walk(base, top_down=False) for _ in range(100)]
for i in range(depth + 1):
expected = (path, ['d'] if i else [], [])
for it in iters:
self.assertEqual(next(it), expected)
path = path.parent

iters = [base.walk(top_down=True) for _ in range(100)]
iters = [self.walk(base, top_down=True) for _ in range(100)]
path = base
for i in range(depth + 1):
expected = (path, ['d'] if i < depth else [], [])
Expand All @@ -2898,8 +2902,62 @@ def test_walk_above_recursion_limit(self):
path.mkdir(parents=True)

with set_recursion_limit(recursion_limit):
list(base.walk())
list(base.walk(top_down=False))
list(self.walk(base))
list(self.walk(base, top_down=False))


@unittest.skipUnless(hasattr(pathlib.Path, 'fwalk'), "Test needs pathlib.Path.fwalk()")
class FwalkTests(WalkTests):
"""Tests for pathlib.Path.fwalk()."""

def walk(self, top, **kwargs):
for root, dirs, files, root_fd in top.fwalk(**kwargs):
yield (root, dirs, files)

def _compare_to_walk(self, walk_top, walk_kwargs, fwalk_top, fwalk_kwargs):
"""
compare with walk() results.
"""
walk_top = pathlib.Path(walk_top)
walk_kwargs = walk_kwargs.copy()
fwalk_top = pathlib.Path(fwalk_top)
fwalk_kwargs = fwalk_kwargs.copy()
for top_down, follow_symlinks in itertools.product((True, False), repeat=2):
walk_kwargs.update(top_down=top_down, follow_symlinks=follow_symlinks)
fwalk_kwargs.update(top_down=top_down, follow_symlinks=follow_symlinks)

expected = {}
for root, dirs, files in walk_top.walk(**walk_kwargs):
expected[root] = (set(dirs), set(files))

for root, dirs, files, rootfd in fwalk_top.fwalk(**fwalk_kwargs):
self.assertIn(root, expected)
self.assertEqual(expected[root], (set(dirs), set(files)))

def test_compare_to_walk(self):
self._compare_to_walk(os_helper.TESTFN, {}, os_helper.TESTFN, {})

def test_dir_fd(self):
try:
fd = os.open(".", os.O_RDONLY)
self._compare_to_walk(os_helper.TESTFN, {}, os_helper.TESTFN, {'dir_fd': fd})
finally:
os.close(fd)

def test_yields_correct_dir_fd(self):
# check returned file descriptors
p = pathlib.Path(os_helper.TESTFN)
for top_down, follow_symlinks in itertools.product((True, False), repeat=2):
for root, dirs, files, rootfd in p.fwalk(top_down, follow_symlinks=follow_symlinks):
# check that the FD is valid
os.fstat(rootfd)
# redundant check
os.stat(rootfd)
# check that listdir() returns consistent information
self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))

# fwalk() keeps file descriptors open
test_walk_many_open_files = None


class PathTest(_BasePathTest, unittest.TestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add :meth:`pathlib.Path.fwalk` for walking the directory trees with file
descriptors, similar to :func:`os.fwalk`.