Skip to content

GH-125413: Add private pathlib.Path method to write metadata #130238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from abc import ABC, abstractmethod
from glob import _PathGlobber, _no_recurse_symlinks
from pathlib import PurePath, Path
from pathlib._os import magic_open, CopyWriter
from pathlib._os import magic_open, ensure_distinct_paths, copy_file


def _explode_path(path):
Expand Down Expand Up @@ -347,13 +347,8 @@ def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
"""
if not hasattr(target, 'with_segments'):
target = self.with_segments(target)

# Delegate to the target path's CopyWriter object.
try:
create = target._copy_writer._create
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
ensure_distinct_paths(self, target)
copy_file(self, target, follow_symlinks, dirs_exist_ok, preserve_metadata)
return target.joinpath() # Empty join to ensure fresh metadata.

def copy_into(self, target_dir, *, follow_symlinks=True,
Expand Down Expand Up @@ -424,7 +419,11 @@ def write_text(self, data, encoding=None, errors=None, newline=None):
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)

_copy_writer = property(CopyWriter)
def _write_info(self, info, follow_symlinks=True):
"""
Write the given PathInfo to this path.
"""
pass


JoinablePath.register(PurePath)
Expand Down
23 changes: 13 additions & 10 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
except ImportError:
grp = None

from pathlib._os import LocalCopyWriter, PathInfo, DirEntryInfo, ensure_different_files
from pathlib._os import (
PathInfo, DirEntryInfo,
ensure_different_files, ensure_distinct_paths,
copy_file, copy_info,
)


__all__ = [
Expand Down Expand Up @@ -799,6 +803,12 @@ def write_text(self, data, encoding=None, errors=None, newline=None):
with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)

def _write_info(self, info, follow_symlinks=True):
"""
Write the given PathInfo to this path.
"""
copy_info(info, self, follow_symlinks=follow_symlinks)

_remove_leading_dot = operator.itemgetter(slice(2, None))
_remove_trailing_slash = operator.itemgetter(slice(-1))

Expand Down Expand Up @@ -1083,22 +1093,15 @@ def replace(self, target):
target = self.with_segments(target)
return target

_copy_writer = property(LocalCopyWriter)

def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not hasattr(target, 'with_segments'):
target = self.with_segments(target)

# Delegate to the target path's CopyWriter object.
try:
create = target._copy_writer._create
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
ensure_distinct_paths(self, target)
copy_file(self, target, follow_symlinks, dirs_exist_ok, preserve_metadata)
return target.joinpath() # Empty join to ensure fresh metadata.

def copy_into(self, target_dir, *, follow_symlinks=True,
Expand Down
254 changes: 99 additions & 155 deletions Lib/pathlib/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ def _sendfile(source_fd, target_fd):


if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile(source, target):
def _copyfile2(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile = None
_copyfile2 = None


def copyfileobj(source_f, target_f):
def _copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
Expand Down Expand Up @@ -200,70 +200,6 @@ def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")


class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

def _copy_metadata(self, source, follow_symlinks=True):
"""Copy metadata from the given path to our path."""
pass

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
ensure_distinct_paths(source, self._path)
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, preserve_metadata)
elif source.is_dir():
self._create_dir(source, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
self._create_file(source, preserve_metadata)
return self._path

def _create_dir(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, preserve_metadata)
elif src.is_dir():
dst._copy_writer._create_dir(src, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
dst._copy_writer._create_file(src, preserve_metadata)

if preserve_metadata:
self._copy_metadata(source)

def _create_file(self, source, preserve_metadata):
"""Copy the given file to our path."""
ensure_different_files(source, self._path)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if preserve_metadata:
self._copy_metadata(source)

def _create_symlink(self, source, preserve_metadata):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if preserve_metadata:
self._copy_metadata(source, follow_symlinks=False)


def ensure_distinct_paths(source, target):
"""
Raise OSError(EINVAL) if the other path is within this path.
Expand All @@ -284,94 +220,6 @@ def ensure_distinct_paths(source, target):
raise err


class LocalCopyWriter(CopyWriter):
"""This object implements the "write" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()

def _copy_metadata(self, source, follow_symlinks=True):
"""Copy metadata from the given path to our path."""
target = self._path
info = source.info

copy_times_ns = (
hasattr(info, '_access_time_ns') and
hasattr(info, '_mod_time_ns') and
(follow_symlinks or os.utime in os.supports_follow_symlinks))
if copy_times_ns:
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)

# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
copy_xattrs = (
hasattr(info, '_xattrs') and
hasattr(os, 'setxattr') and
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
if copy_xattrs:
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise

copy_posix_permissions = (
hasattr(info, '_posix_permissions') and
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
if copy_posix_permissions:
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass

copy_bsd_flags = (
hasattr(info, '_bsd_flags') and
hasattr(os, 'chflags') and
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
if copy_bsd_flags:
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise

if copyfile:
# Use fast OS routine for local file copying where available.
def _create_file(self, source, preserve_metadata):
"""Copy the given file to the given target."""
try:
source = os.fspath(source)
except TypeError:
super()._create_file(source, preserve_metadata)
else:
copyfile(source, os.fspath(self._path))

if os.name == 'nt':
# Windows: symlink target might not exist yet if we're copying several
# files, so ensure we pass is_dir to os.symlink().
def _create_symlink(self, source, preserve_metadata):
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
if preserve_metadata:
self._copy_metadata(source, follow_symlinks=False)


def ensure_different_files(source, target):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
Expand All @@ -394,6 +242,102 @@ def ensure_different_files(source, target):
raise err


def copy_file(source, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy the given source ReadablePath to the given target WritablePath.
"""
info = source.info
if not follow_symlinks and info.is_symlink():
target.symlink_to(source.readlink(), info.is_dir())
if preserve_metadata:
target._write_info(info, follow_symlinks=False)
elif info.is_dir():
children = source.iterdir()
target.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = target.joinpath(src.name)
copy_file(src, dst, follow_symlinks, dirs_exist_ok, preserve_metadata)
if preserve_metadata:
target._write_info(info)
else:
if _copyfile2:
# Use fast OS routine for local file copying where available.
try:
source_p = os.fspath(source)
target_p = os.fspath(target)
except TypeError:
pass
else:
_copyfile2(source_p, target_p)
return
ensure_different_files(source, target)
with magic_open(source, 'rb') as source_f:
with magic_open(target, 'wb') as target_f:
_copyfileobj(source_f, target_f)
if preserve_metadata:
target._write_info(info)


def copy_info(info, target, follow_symlinks=True):
"""Copy metadata from the given PathInfo to the given local path."""
copy_times_ns = (
hasattr(info, '_access_time_ns') and
hasattr(info, '_mod_time_ns') and
(follow_symlinks or os.utime in os.supports_follow_symlinks))
if copy_times_ns:
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)

# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
copy_xattrs = (
hasattr(info, '_xattrs') and
hasattr(os, 'setxattr') and
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
if copy_xattrs:
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise

copy_posix_permissions = (
hasattr(info, '_posix_permissions') and
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
if copy_posix_permissions:
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass

copy_bsd_flags = (
hasattr(info, '_bsd_flags') and
hasattr(os, 'chflags') and
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
if copy_bsd_flags:
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise


class _PathInfoBase:
__slots__ = ('_path', '_stat_result', '_lstat_result')

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Speed up :meth:`Path.copy <pathlib.Path.copy>` by making better use of
:attr:`~pathlib.Path.info` internally.
Loading