Skip to content

Accessor refactoring and CloudPath #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 39 additions & 65 deletions upath/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from typing import Union
import urllib
from urllib.parse import ParseResult

from fsspec.registry import (
get_filesystem_class,
Expand All @@ -15,76 +16,49 @@


class _FSSpecAccessor:
def __init__(self, parsed_url, *args, **kwargs):
self._url = parsed_url
cls = get_filesystem_class(self._url.scheme)
__slots__ = ("_fs",)

def __init__(self, parsed_url: ParseResult, **kwargs):
cls = get_filesystem_class(parsed_url.scheme)
url_kwargs = cls._get_kwargs_from_urls(
urllib.parse.urlunparse(self._url)
urllib.parse.urlunparse(parsed_url)
)
url_kwargs.update(kwargs)
self._fs = cls(**url_kwargs)

def transform_args_wrapper(self, func):
"""Modifies the arguments that get passed to the filesystem so that
the UPath instance gets stripped as the first argument. If a
path keyword argument is not given, then `UPath.path` is
formatted for the filesystem and inserted as the first argument.
If it is, then the path keyword argument is formatted properly for
the filesystem.
"""
def _format_path(self, path: "UPath") -> str:
return path.path

def wrapper(*args, **kwargs):
args, kwargs = self._transform_arg_paths(args, kwargs)
return func(*args, **kwargs)

return wrapper

def _transform_arg_paths(self, args, kwargs):
"""Formats the path properly for the filesystem backend."""
args = list(args)
first_arg = args.pop(0)
if not kwargs.get("path"):
if isinstance(first_arg, UPath):
first_arg = self._format_path(first_arg.path)
args.insert(0, first_arg)
args = tuple(args)
else:
kwargs["path"] = self._format_path(kwargs["path"])
return args, kwargs

def _format_path(self, s):
"""Placeholder method for subclassed filesystems"""
return s

def __getattribute__(self, item):
class_attrs = ["_url", "_fs", "__class__"]
if item in class_attrs:
return super().__getattribute__(item)

class_methods = [
"__init__",
"__getattribute__",
"transform_args_wrapper",
"_transform_arg_paths",
"_format_path",
]
if item in class_methods:
return lambda *args, **kwargs: getattr(self.__class__, item)(
self, *args, **kwargs
)
def open(self, path, *args, **kwargs):
return self._fs.open(self._format_path(path), *args, **kwargs)

d = object.__getattribute__(self, "__dict__")
fs = d.get("_fs", None)
if fs is not None:
method = getattr(fs, item, None)
if method:
return lambda *args, **kwargs: (
self.transform_args_wrapper(method)(*args, **kwargs)
) # noqa: E501
else:
raise NotImplementedError(
f"{fs.protocol} filesystem has no attribute {item}"
)
def stat(self, path, **kwargs):
return self._fs.stat(self._format_path(path), **kwargs)

def listdir(self, path, **kwargs):
return self._fs.listdir(self._format_path(path), **kwargs)

def glob(self, _path, path_pattern, **kwargs):
return self._fs.glob(self._format_path(path_pattern), **kwargs)

def exists(self, path, **kwargs):
return self._fs.exists(self._format_path(path), **kwargs)

def info(self, path, **kwargs):
return self._fs.info(self._format_path(path), **kwargs)

def rm(self, path, recursive, **kwargs):
return self._fs.rm(
self._format_path(path), recursive=recursive, **kwargs
)

def mkdir(self, path, create_parents=True, **kwargs):
return self._fs.mkdir(
self._format_path(path), create_parents=create_parents, **kwargs
)

def touch(self, path, **kwargs):
return self._fs.touch(self._format_path(path), **kwargs)


class UPath(pathlib.Path):
Expand Down Expand Up @@ -246,8 +220,8 @@ def relative_to(self, *other):
return output

def glob(self, pattern):
path = self.joinpath(pattern)
for name in self._accessor.glob(self, path=path.path):
path_pattern = self.joinpath(pattern)
for name in self._accessor.glob(self, path_pattern):
name = self._sub_path(name)
name = name.split(self._flavour.sep)
yield self._make_child(name)
Expand Down
30 changes: 17 additions & 13 deletions upath/implementations/gcs.py → upath/implementations/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@
import re


class _GCSAccessor(upath.core._FSSpecAccessor):
def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)

def _format_path(self, s):
class _CloudAccessor(upath.core._FSSpecAccessor):
def _format_path(self, path):
"""
netloc has already been set to project via `GCSPath._from_parts`
netloc has already been set to project via `CloudPath._from_parts`
"""
s = f"{self._url.netloc}/{s.lstrip('/')}"
return s
return f"{path._url.netloc}/{path.path.lstrip('/')}"


# project is not part of the path, but is part of the credentials
class GCSPath(upath.core.UPath):
_default_accessor = _GCSAccessor
class CloudPath(upath.core.UPath):
_default_accessor = _CloudAccessor

@classmethod
def _from_parts(cls, args, url=None, **kwargs):
Expand All @@ -36,9 +32,9 @@ def _from_parsed_parts(cls, drv, root, parts, url=None, **kwargs):

def _sub_path(self, name):
"""
`gcsfs` returns the full path as `<bucket>/<path>` with `listdir` and
`glob`. However, in `iterdir` and `glob` we only want the relative path
to `self`.
`gcsfs` and `s3fs` return the full path as `<bucket>/<path>` with
`listdir` and `glob`. However, in `iterdir` and `glob` we only want the
relative path to `self`.
"""
sp = self.path
subed = re.sub(f"^({self._url.netloc})?/?({sp}|{sp[1:]})/?", "", name)
Expand All @@ -57,3 +53,11 @@ def joinpath(self, *args):
bucket = args_list.pop(0)
self._kwargs["bucket"] = bucket
return super().joinpath(*tuple(args_list))


class GCSPath(CloudPath):
pass


class S3Path(CloudPath):
pass
18 changes: 3 additions & 15 deletions upath/implementations/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,9 @@ def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)
self._fs.root_marker = "/"

def transform_args_wrapper(self, func):
"""If arguments are passed to the wrapped function, and if the first
argument is a UPath instance, that argument is replaced with
the UPath's path attribute
"""

def wrapper(*args, **kwargs):
args, kwargs = self._transform_arg_paths(args, kwargs)
if "trunicate" in kwargs:
kwargs.pop("trunicate")
if func.__name__ == "mkdir":
args = args[:1]
return func(*args, **kwargs)

return wrapper
def touch(self, **kwargs):
kwargs.pop("trunicate", None)
super().touch(self, **kwargs)


class HDFSPath(upath.core.UPath):
Expand Down
26 changes: 2 additions & 24 deletions upath/implementations/http.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,12 @@
import urllib

import upath.core


class _HTTPAccessor(upath.core._FSSpecAccessor):
def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)

def transform_args_wrapper(self, func):
"""if arguments are passed to the wrapped function, and if the first
argument is a UPath instance, that argument is replaced with
the UPath's path attribute
"""

def wrapper(*args, **kwargs):
if args:
args = list(args)
first_arg = args.pop(0)
if not kwargs.get("path"):
if isinstance(first_arg, upath.core.UPath):
first_arg = str(first_arg)
args.insert(0, first_arg)
args = tuple(args)
else:
new_url = self._url._replace(path=kwargs["path"])
unparsed = urllib.parse.urlunparse(new_url)
kwargs["path"] = unparsed
return func(*args, **kwargs)

return wrapper
def _format_path(self, path):
return str(path)


class HTTPPath(upath.core.UPath):
Expand Down
60 changes: 0 additions & 60 deletions upath/implementations/s3.py

This file was deleted.

10 changes: 5 additions & 5 deletions upath/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@


class _Registry:
from upath.implementations import hdfs, http, memory, s3, gcs
from upath.implementations import hdfs, http, memory, cloud

known_implementations: Dict[str, Type[UPath]] = {
"https": http.HTTPPath,
"http": http.HTTPPath,
"hdfs": hdfs.HDFSPath,
"s3a": s3.S3Path,
"s3": s3.S3Path,
"s3a": cloud.S3Path,
"s3": cloud.S3Path,
"memory": memory.MemoryPath,
"gs": gcs.GCSPath,
"gcs": gcs.GCSPath,
"gs": cloud.GCSPath,
"gcs": cloud.GCSPath,
}

def __getitem__(self, item):
Expand Down
2 changes: 1 addition & 1 deletion upath/tests/implementations/test_gcs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from upath import UPath
from upath.implementations.gcs import GCSPath
from upath.implementations.cloud import GCSPath
from upath.errors import NotDirectoryError
from ..cases import BaseTests
from ..utils import skip_on_windows
Expand Down
2 changes: 1 addition & 1 deletion upath/tests/implementations/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from upath import UPath
from upath.errors import NotDirectoryError
from upath.implementations.s3 import S3Path
from upath.implementations.cloud import S3Path
from ..cases import BaseTests


Expand Down
2 changes: 1 addition & 1 deletion upath/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest
from upath import UPath
from upath.implementations.s3 import S3Path
from upath.implementations.cloud import S3Path
from .cases import BaseTests
from .utils import only_on_windows, skip_on_windows

Expand Down