Skip to content

GH-128520: Make pathlib._abc.WritablePath a sibling of ReadablePath #129014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 90 additions & 46 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import functools
import io
import operator
import posixpath
from errno import EINVAL
Expand Down Expand Up @@ -41,6 +42,40 @@ def _explode_path(path):
return path, names


def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
"""
try:
return io.open(path, mode, buffering, encoding, errors, newline)
except TypeError:
pass
cls = type(path)
text = 'b' not in mode
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if text:
try:
attr = getattr(cls, f'__open_{mode}__')
except AttributeError:
pass
else:
return attr(path, buffering, encoding, errors, newline)

try:
attr = getattr(cls, f'__open_{mode}b__')
except AttributeError:
pass
else:
stream = attr(path, buffering)
if text:
stream = io.TextIOWrapper(stream, encoding, errors, newline)
return stream

raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")


class PathGlobber(_GlobberBase):
"""
Class providing shell-style globbing for path objects.
Expand All @@ -58,35 +93,15 @@ def concat_path(path, text):

class CopyReader:
"""
Class that implements copying between path objects. An instance of this
class is available from the ReadablePath.copy property; it's made callable
so that ReadablePath.copy() can be treated as a method.

The target path's CopyWriter drives the process from its _create() method.
Files and directories are exchanged by calling methods on the source and
target paths, and metadata is exchanged by calling
source.copy._read_metadata() and target.copy._write_metadata().
Class that implements the "read" part of copying between path objects.
An instance of this class is available from the ReadablePath._copy_reader
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not isinstance(target, ReadablePath):
target = self._path.with_segments(target)

# Delegate to the target path's CopyWriter object.
try:
create = target.copy._create
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
return create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
Expand All @@ -96,8 +111,16 @@ def _read_metadata(self, metakeys, *, follow_symlinks=True):
raise NotImplementedError


class CopyWriter(CopyReader):
__slots__ = ()
class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

_writable_metakeys = frozenset()

Expand All @@ -110,7 +133,7 @@ def _write_metadata(self, metadata, *, follow_symlinks=True):
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source.copy._readable_metakeys
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
Expand All @@ -128,22 +151,22 @@ def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst.copy._create_symlink(src, metakeys)
dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst.copy._create_file(src, metakeys)
dst._copy_writer._create_file(src, metakeys)
if metakeys:
metadata = source.copy._read_metadata(metakeys)
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with source.open('rb') as source_f:
with magic_open(source, 'rb') as source_f:
try:
with self._path.open('wb') as target_f:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
Expand All @@ -152,15 +175,15 @@ def _create_file(self, source, metakeys):
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source.copy._read_metadata(metakeys)
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

Expand Down Expand Up @@ -420,26 +443,25 @@ def is_symlink(self):
"""
raise NotImplementedError

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
def __open_rb__(self, buffering=-1):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
Open the file pointed to by this path for reading in binary mode and
return a file object, like open(mode='rb').
"""
raise NotImplementedError

def read_bytes(self):
"""
Open the file in bytes mode, read it, and close the file.
"""
with self.open(mode='rb', buffering=0) as f:
with magic_open(self, mode='rb', buffering=0) as f:
return f.read()

def read_text(self, encoding=None, errors=None, newline=None):
"""
Open the file in text mode, read it, and close the file.
"""
with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
return f.read()

def _scandir(self):
Expand Down Expand Up @@ -532,7 +554,22 @@ def readlink(self):
"""
raise NotImplementedError

copy = property(CopyReader, doc=CopyReader.__call__.__doc__)
_copy_reader = property(CopyReader)

def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not hasattr(target, '_copy_writer'):
target = self.with_segments(target)

# Delegate to the target path's CopyWriter object.
try:
create = target._copy_writer._create
except AttributeError:
raise TypeError(f"Target is not writable: {target}") from None
return create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)

def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
Expand All @@ -542,7 +579,7 @@ def copy_into(self, target_dir, *, follow_symlinks=True,
name = self.name
if not name:
raise ValueError(f"{self!r} has an empty name")
elif isinstance(target_dir, ReadablePath):
elif hasattr(target_dir, '_copy_writer'):
target = target_dir / name
else:
target = self.with_segments(target_dir, name)
Expand All @@ -551,7 +588,7 @@ def copy_into(self, target_dir, *, follow_symlinks=True,
preserve_metadata=preserve_metadata)


class WritablePath(ReadablePath):
class WritablePath(JoinablePath):
__slots__ = ()

def symlink_to(self, target, target_is_directory=False):
Expand All @@ -567,13 +604,20 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
raise NotImplementedError

def __open_wb__(self, buffering=-1):
"""
Open the file pointed to by this path for writing in binary mode and
return a file object, like open(mode='wb').
"""
raise NotImplementedError

def write_bytes(self, data):
"""
Open the file in bytes mode, write to it, and close the file.
"""
# type-check for the buffer interface before truncating the file
view = memoryview(data)
with self.open(mode='wb') as f:
with magic_open(self, mode='wb') as f:
return f.write(view)

def write_text(self, data, encoding=None, errors=None, newline=None):
Expand All @@ -583,7 +627,7 @@ def write_text(self, data, encoding=None, errors=None, newline=None):
if not isinstance(data, str):
raise TypeError('data must be str, not %s' %
data.__class__.__name__)
with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
return f.write(data)

copy = property(CopyWriter, doc=CopyWriter.__call__.__doc__)
_copy_writer = property(CopyWriter)
Loading
Loading