Skip to content

GH-128520: Read path metadata from pathlib.types.PathInfo when copying #129841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 8 additions & 167 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
"""

import functools
import io
import posixpath
from errno import EINVAL
from glob import _PathGlobber, _no_recurse_symlinks
from pathlib._os import copyfileobj
from pathlib._os import Copier, magic_open


@functools.cache
Expand All @@ -41,162 +39,6 @@ def _explode_path(path):
return path, names


def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
"""
try:
return io.open(path, mode, buffering, encoding, errors, newline)
except TypeError:
pass
cls = type(path)
text = 'b' not in mode
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if text:
try:
attr = getattr(cls, f'__open_{mode}__')
except AttributeError:
pass
else:
return attr(path, buffering, encoding, errors, newline)

try:
attr = getattr(cls, f'__open_{mode}b__')
except AttributeError:
pass
else:
stream = attr(path, buffering)
if text:
stream = io.TextIOWrapper(stream, encoding, errors, newline)
return stream

raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")


class CopyReader:
"""
Class that implements the "read" part of copying between path objects.
An instance of this class is available from the ReadablePath._copy_reader
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError


class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_writable_metakeys = frozenset()

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path

def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst._copy_writer._create_file(src, metakeys)
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class JoinablePath:
"""Base class for pure path objects.

Expand Down Expand Up @@ -512,22 +354,21 @@ def readlink(self):
"""
raise NotImplementedError

_copy_reader = property(CopyReader)

def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not hasattr(target, '_copy_writer'):
if not hasattr(target, '_copier'):
target = self.with_segments(target)

# Delegate to the target path's CopyWriter object.
# Delegate to the target path's copier.
try:
create = target._copy_writer._create
copier = target._copier
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
return create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
copier.ensure_distinct_path(self, target)
return copier(preserve_metadata, follow_symlinks, dirs_exist_ok).copy(self, target)

def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
Expand All @@ -537,7 +378,7 @@ def copy_into(self, target_dir, *, follow_symlinks=True,
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
elif hasattr(target_dir, '_copy_writer'):
elif hasattr(target_dir, '_copier'):
target = target_dir / name
else:
target = self.with_segments(target_dir, name)
Expand Down Expand Up @@ -588,4 +429,4 @@ def write_text(self, data, encoding=None, errors=None, newline=None):
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)

_copy_writer = property(CopyWriter)
_copier = Copier
Loading
Loading