Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimized copyfile #37

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ install_requires=
[options.extras_require]
tests =
pytest==7.1.2
pytest-benchmark==3.4.1
pytest-sugar==0.9.4
pytest-cov==3.0.0
pytest-mock==3.7.0
Expand Down
160 changes: 160 additions & 0 deletions src/dvc_objects/fs/fastcopy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import errno
import os
import sys
from shutil import _fastcopy_fcopyfile as _fcopyfile # type: ignore
from shutil import _fastcopy_sendfile as _sendfile # type: ignore
from shutil import _GiveupOnFastCopy # type: ignore
from shutil import copyfileobj as _copyfileobj_shutil

try:
import posix # type:ignore
except ImportError:
posix = None # type: ignore


_WINDOWS = os.name == "nt"
COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 2**20
_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
_USE_CP_COPY_FILE_RANGE = hasattr(os, "copy_file_range")


def _determine_linux_fastcopy_blocksize(infd):
"""Determine blocksize for fastcopying on Linux.
Hopefully the whole file will be copied in a single call.
The copying itself should be performed in a loop 'till EOF is
reached (0 return) so a blocksize smaller or bigger than the actual
file size should not make any difference, also in case the file
content changes while being copied.
"""
try:
blocksize = max(os.fstat(infd).st_size, 2**23) # min 8 MiB
except OSError:
blocksize = 2**27 # 128 MiB
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
# see gh-82500.
if sys.maxsize < 2**32:
blocksize = min(blocksize, 2**30)
return blocksize


def _copy_file_range(fsrc, fdst):
"""Copy data from one regular mmap-like fd to another by using
a high-performance copy_file_range(2) syscall that gives filesystems
an opportunity to implement the use of reflinks or server-side copy.
This should work on Linux >= 4.5 only.

See https://github.com/python/cpython/pull/93152.
"""
try:
infd = fsrc.fileno()
outfd = fdst.fileno()
except Exception as err:
raise _GiveupOnFastCopy(err) # not a regular file

blocksize = _determine_linux_fastcopy_blocksize(infd)
offset = 0
while True:
try:
n_copied = os.copy_file_range( # pylint: disable=no-member
infd, outfd, blocksize, offset_dst=offset
)
except OSError as err:
# ...in order to have a more informative exception.
err.filename = fsrc.name
err.filename2 = fdst.name

if err.errno == errno.ENOSPC: # filesystem is full
raise err from None

# Give up on first call and if no data was copied.
if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
raise _GiveupOnFastCopy(err)

raise err
else:
if n_copied == 0:
# If no bytes have been copied yet, copy_file_range
# might silently fail.
# https://lore.kernel.org/linux-fsdevel/20210126233840.GG4626@dread.disaster.area/T/#m05753578c7f7882f6e9ffe01f981bc223edef2b0
if offset == 0:
raise _GiveupOnFastCopy()
break
offset += n_copied


def _copyfileobj_readinto(fsrc, fdst, callback=None, length=COPY_BUFSIZE):
"""readinto()/memoryview() based variant of copyfileobj().
*fsrc* must support readinto() method and both files must be
open in binary mode.
"""
# Localize variable access to minimize overhead.
fsrc_readinto = fsrc.readinto
fdst_write = fdst.write
with memoryview(bytearray(length)) as mv:
while n := fsrc_readinto(mv):
if callback:
callback.relative_update(n)
if n >= length:
fdst_write(mv)
continue

with mv[:n] as smv:
fdst.write(smv)


def _copyfileobj(fsrc, fdst, callback=None, length=COPY_BUFSIZE):
file_size = os.fstat(fsrc.fileno()).st_size
if callback:
callback.set_size(file_size)

if _WINDOWS and file_size > 0:
# Windows, see:
# https://github.com/python/cpython/pull/7160#discussion_r195405230
return _copyfileobj_readinto(
fsrc, fdst, callback, min(file_size, length)
)

wrapped = callback.wrap_attr(fsrc) if callback else fsrc
_copyfileobj_shutil(wrapped, fdst, length=length)


def _copyfile(fsrc, fdst, callback):
if _HAS_FCOPYFILE: # macOS
try:
# pylint: disable=protected-access, no-member
return _fcopyfile(
fsrc,
fdst,
posix._COPYFILE_DATA,
)
except _GiveupOnFastCopy:
pass

if _USE_CP_COPY_FILE_RANGE:
try:
return _copy_file_range(fsrc, fdst)
except _GiveupOnFastCopy:
pass

if _USE_CP_SENDFILE:
try:
return _sendfile(fsrc, fdst)
except _GiveupOnFastCopy:
pass

return _copyfileobj(fsrc, fdst, callback)


def copyfile(src, dst, *, callback=None, copy_function=_copyfile):
from .callbacks import Callback

with open(src, "rb") as fsrc:
try:
with open(dst, "wb") as fdst, Callback.as_callback(callback) as cb:
copy_function(fsrc, fdst, cb)
return dst
except IsADirectoryError as e:
if os.path.exists(dst):
raise
raise FileNotFoundError(f"Directory does not exist: {dst}") from e
41 changes: 16 additions & 25 deletions src/dvc_objects/fs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import TYPE_CHECKING, Iterator

from . import system
from .fastcopy import copyfile as fast_copyfile

if TYPE_CHECKING:
from .base import AnyFSPath, FileSystem
Expand Down Expand Up @@ -143,39 +144,29 @@ def copyfile(
callback: "Callback" = None,
no_progress_bar: bool = False,
name: str = None,
reflink: bool = False,
) -> None:
"""Copy file with progress bar"""
name = name if name else os.path.basename(dest)
total = os.stat(src).st_size

if os.path.isdir(dest):
dest = os.path.join(dest, os.path.basename(src))

if callback:
callback.set_size(total)
if reflink:
try:
return system.reflink(src, dest)
except OSError:
pass

try:
system.reflink(src, dest)
except OSError:
from .callbacks import Callback

with open(src, "rb") as fsrc, open(dest, "wb+") as fdest:
with Callback.as_tqdm_callback(
callback,
size=total,
bytes=True,
disable=no_progress_bar,
desc=name,
) as cb:
wrapped = cb.wrap_attr(fdest, "write")
while True:
buf = fsrc.read(LOCAL_CHUNK_SIZE)
if not buf:
break
wrapped.write(buf)

if callback:
callback.absolute_update(total)
from .callbacks import Callback

with Callback.as_tqdm_callback(
callback,
bytes=True,
disable=no_progress_bar,
desc=name,
) as cb:
fast_copyfile(src, dest, callback=cb)


def tmp_fname(fname: "AnyFSPath" = "") -> "AnyFSPath":
Expand Down
Empty file added tests/benchmarks/__init__.py
Empty file.
132 changes: 132 additions & 0 deletions tests/benchmarks/test_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import os
import shutil
from functools import partial
from pathlib import Path

import pytest

from dvc_objects.fs import fastcopy, system
from dvc_objects.fs.utils import human_readable_to_bytes, remove

try:
import posix # type: ignore
except ImportError:
posix = None # type: ignore


TEST_DIR = Path(__file__).resolve().parents[1] / "copy-test-dir"
FILE_SIZES = [
"1KB",
"10KB",
"100KB",
"1MB",
"10MB",
"100MB",
"1GB",
# "2GB",
# "5GB",
# "10GB",
]


def write_random_data(
path: Path, size: int, chunk_size: int = 1024 * 1024 * 1024
) -> None:
TEST_DIR.mkdir(exist_ok=True, parents=True)
try:
with path.open("wb") as fobj:
while size > chunk_size:
fobj.write(os.urandom(chunk_size))
size -= chunk_size
fobj.write(os.urandom(size))
except: # noqa: E722, B001
remove(os.fspath(path))
raise


def copyfile_length(src, dst, length=0):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
shutil.copyfileobj(fsrc, fdst, length=0)


def copyfile_read(src, dst):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
fastcopy._copyfileobj(fsrc, fdst)


def copyfile_readinto(src, dst):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
file_size = os.fstat(fsrc.fileno()).st_size
return fastcopy._copyfileobj_readinto(
fsrc, fdst, length=min(file_size, fastcopy.COPY_BUFSIZE)
)


def copyfile_range(src, dst):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
fastcopy._copy_file_range(fsrc, fdst)


def copyfile_reflink(src, dst):
return system.reflink(src, dst)


def copyfile_sendfile(src, dst):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
fastcopy._sendfile(fsrc, fdst)


def copyfile_fcopyfile(src, dst):
with open(src, "rb") as fsrc, open(dst, "wb+") as fdst:
fastcopy._fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)


COPY_FUNCTIONS = {
"fastcopy": fastcopy.copyfile, # platform-specific copy
"shutil_copy": shutil.copyfile,
"read": copyfile_read, # read-write
"readinto": copyfile_readinto,
"64k": partial(copyfile_length, length=64 * 1024),
# "128k": partial(copyfile_length, length=128 * 1024),
# "256k": partial(copyfile_length, length=256 * 1024),
# "512k": partial(copyfile_length, length=512 * 1024),
# "1M": partial(copyfile_length, length=1024 * 1024),
# "4M": partial(copyfile_length, length=4 * 1024 * 1024),
# "10M": partial(copyfile_length, length=10 * 1024 * 1024),
# "100M": partial(copyfile_length, length=100 * 1024 * 1024),
}

if posix and fastcopy._HAS_FCOPYFILE:
COPY_FUNCTIONS["fcopyfile"] = copyfile_fcopyfile

if fastcopy._USE_CP_COPY_FILE_RANGE:
COPY_FUNCTIONS["copy_file_range"] = copyfile_range

if fastcopy._USE_CP_SENDFILE:
COPY_FUNCTIONS["sendfile"] = copyfile_sendfile

COPY_FUNCTIONS["reflink"] = pytest.param(
copyfile_reflink, marks=pytest.mark.xfail(raises=OSError)
)


@pytest.mark.parametrize("hsize", FILE_SIZES)
@pytest.mark.parametrize(
"copy_function", COPY_FUNCTIONS.values(), ids=COPY_FUNCTIONS.keys()
)
def test_sendfile(request, benchmark, copy_function, hsize):
src = TEST_DIR / f"orig-{hsize}"
dst = TEST_DIR / f"dup-{hsize}"
if not src.exists():
write_random_data(src, human_readable_to_bytes(hsize))
request.addfinalizer(partial(remove, os.fspath(dst)))

benchmark(copy_function, src, dst)
assert dst.stat().st_size == src.stat().st_size


if __name__ == "__main__":
for hsize in FILE_SIZES:
size = human_readable_to_bytes(hsize)
write_random_data(TEST_DIR / f"orig-{hsize}", size)
print(hsize)