Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ jobs:
PYTEST_ADDOPTS: "--color=yes"
run: poetry run pytest

- name: Static type check
run: poetry run mypy

- name: Upload coverage
uses: codecov/codecov-action@v1
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ repos:
- id: pyupgrade
args: ["--py36-plus"]

# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.971
# hooks:
# - id: mypy
# args: ["pathable"]

- repo: local
hooks:
- id: flynt
Expand Down
21 changes: 15 additions & 6 deletions pathable/accessors.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
"""Pathable accessors module"""
from contextlib import contextmanager
from typing import Any
from typing import Dict
from typing import Hashable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Union


class LookupAccessor:
"""Accessor for object that supports __getitem__ lookups"""

def __init__(self, lookup):
def __init__(self, lookup: Mapping[Hashable, Any]):
self.lookup = lookup

def stat(self, parts):
return NotImplementedError
def stat(self, parts: List[Hashable]) -> Dict[str, Any]:
raise NotImplementedError

def keys(self, parts):
def keys(self, parts: List[Hashable]) -> Any:
with self.open(parts) as d:
return d.keys()

def len(self, parts):
def len(self, parts: List[Hashable]) -> int:
with self.open(parts) as d:
return len(d)

@contextmanager
def open(self, parts):
def open(
self, parts: List[Hashable]
) -> Iterator[Union[Mapping[Hashable, Any], Any]]:
content = self.lookup
for part in parts:
content = content[part]
Expand Down
10 changes: 10 additions & 0 deletions pathable/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass
from typing import Hashable
from typing import List


@dataclass
class BasePathData:

parts: List[Hashable]
separator: str
24 changes: 15 additions & 9 deletions pathable/parsers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
"""Pathable parsers module"""
from typing import Any
from typing import Hashable
from typing import List
from typing import Union

from pathable.types import PartType

def fsdecode(part):
if isinstance(part, bytes):
return part.decode("ascii")
return part
SEPARATOR = "/"


def parse_parts(parts, sep="/"):
parsed = []
def parse_parts(parts: List[PartType], sep: str = SEPARATOR) -> List[Hashable]:
"""Parse (filter and split) path parts."""
parsed: List[Hashable] = []
it = reversed(parts)
for part in it:
if isinstance(part, int):
parsed.append(part)
continue
if not part:
continue
if sep in part:
Expand All @@ -26,13 +30,15 @@ def parse_parts(parts, sep="/"):
return parsed


def parse_args(args, sep="/"):
parts = []
def parse_args(args: List[Any], sep: str = SEPARATOR) -> List[Hashable]:
"""Canonicalize path constructor arguments."""
parts: List[PartType] = []
for a in args:
if hasattr(a, "parts"):
parts += a.parts
else:
a = fsdecode(a)
if isinstance(a, bytes):
a = a.decode("ascii")
if isinstance(a, str):
parts.append(a)
elif isinstance(a, int):
Expand Down
141 changes: 88 additions & 53 deletions pathable/paths.py
Original file line number Diff line number Diff line change
@@ -1,184 +1,219 @@
"""Pathable paths module"""
from contextlib import contextmanager
from typing import Any
from typing import Hashable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Tuple

from pathable.accessors import LookupAccessor
from pathable.dataclasses import BasePathData
from pathable.parsers import SEPARATOR
from pathable.parsers import parse_args

SEPARATOR = "/"

class BasePath(BasePathData):
"""Base path."""

class BasePath:
"""Base path"""

def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any):
separator = kwargs.pop("separator", SEPARATOR)
self.parts = parse_args(args)
self.parts = parse_args(list(args))
self.separator = separator

self._cparts_cached: Optional[List[str]] = None

@classmethod
def _from_parts(cls, args, separator=SEPARATOR):
def __from_parts(cls, args: List[Any], separator: str = SEPARATOR) -> "BasePath":
self = cls(separator=separator)
self.parts = parse_args(args)
return self

@classmethod
def _from_parsed_parts(cls, parts, separator=SEPARATOR):
def __from_parsed_parts(
cls, parts: List[Hashable], separator: str = SEPARATOR
) -> "BasePath":
self = cls(separator=separator)
self.parts = parts
return self

@property
def _cparts(self):
def _cparts(self) -> List[Any]:
# Cached casefolded parts, for hashing and comparison
try:
return self._cparts_cached
except AttributeError:
if self._cparts_cached is None:
self._cparts_cached = self._get_cparts()
return self._cparts_cached
return self._cparts_cached

def _get_cparts(self):
def _get_cparts(self) -> List[str]:
return [str(p) for p in self.parts]

def _make_child(self, args):
def _make_child(self, args: List[Any]) -> "BasePath":
parts = parse_args(args, self.separator)
parts_joined = self.parts + parts
return self._from_parsed_parts(parts_joined, self.separator)
return self.__from_parsed_parts(parts_joined, self.separator)

def _make_child_relpath(self, part: Hashable) -> "BasePath":
# This is an optimization used for dir walking. `part` must be
# a single part relative to this path.
parts = self.parts + [part]
return self.__from_parsed_parts(parts, self.separator)

def __str__(self):
def __str__(self) -> str:
return self.separator.join(self._cparts)

def __repr__(self):
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self)!r})"

def __hash__(self):
def __hash__(self) -> int:
return hash(tuple(self._cparts))

def __truediv__(self, key):
def __truediv__(self, key: Any) -> "BasePath":
try:
return self._make_child((key,))
return self._make_child(
[
key,
]
)
except TypeError:
return NotImplemented

def __rtruediv__(self, key):
def __rtruediv__(self, key: Hashable) -> "BasePath":
try:
return self._from_parts([key] + self.parts)
return self.__from_parts([key] + self.parts)
except TypeError:
return NotImplemented

def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if not isinstance(other, BasePath):
return NotImplemented
return self._cparts == other._cparts

def __lt__(self, other):
def __lt__(self, other: Any) -> bool:
if not isinstance(other, BasePath):
return NotImplemented
return self._cparts < other._cparts

def __le__(self, other):
def __le__(self, other: Any) -> bool:
if not isinstance(other, BasePath):
return NotImplemented
return self._cparts <= other._cparts

def __gt__(self, other):
def __gt__(self, other: Any) -> bool:
if not isinstance(other, BasePath):
return NotImplemented
return self._cparts > other._cparts

def __ge__(self, other):
def __ge__(self, other: Any) -> bool:
if not isinstance(other, BasePath):
return NotImplemented
return self._cparts >= other._cparts


class AccessorPath(BasePath):
"""Path for object that can be read by accessor"""
"""Path for object that can be read by accessor."""

def __init__(self, accessor, *args, **kwargs):
def __init__(self, accessor: LookupAccessor, *args: Any, **kwargs: Any):
separator = kwargs.pop("separator", SEPARATOR)
super().__init__(*args, separator=separator)
self.accessor = accessor

self._content_cached: Optional[Any] = None

@classmethod
def _from_parsed_parts(cls, accessor, parts, separator=SEPARATOR):
def __from_parsed_parts(
cls,
accessor: LookupAccessor,
parts: List[Hashable],
separator: str = SEPARATOR,
) -> "AccessorPath":
self = cls(accessor, separator=separator)
self.parts = parts
return self

def __iter__(self):
def __iter__(self) -> Iterator["AccessorPath"]:
return self.iter()

def __getitem__(self, key):
def __getitem__(self, key: Hashable) -> Any:
with self.open() as d:
return d[key]

def __contains__(self, key):
def __contains__(self, key: Hashable) -> bool:
with self.open() as d:
return key in d

def __len__(self):
def __len__(self) -> int:
return self.accessor.len(self.parts)

def keys(self):
def keys(self) -> Any:
return self.accessor.keys(self.parts)

def getkey(self, key, default=None):
def getkey(self, key: Hashable, default: Any = None) -> Any:
"""Return the value for key if key is in the path, else default."""
with self.open() as d:
try:
return d[key]
except KeyError:
return default

@contextmanager
def open(self):
def open(self) -> Any:
"""Open the path."""
# Cached path content
try:
yield self._content_cached
except AttributeError:
if self._content_cached is None:
with self._open() as content:
self._content_cached = content
yield self._content_cached
else:
yield self._content_cached

def _open(self):
def _open(self) -> Any:
return self.accessor.open(self.parts)

def iter(self):
def iter(self) -> Iterator["AccessorPath"]:
"""Iterate over all child paths."""
for idx in range(self.accessor.len(self.parts)):
yield self._make_child_relpath(idx)

def iteritems(self):
def iteritems(self) -> Iterator[Tuple[Any, "AccessorPath"]]:
"""Return path's items."""
return self.items()

def items(self):
def items(self) -> Iterator[Tuple[Any, "AccessorPath"]]:
"""Return path's items."""
for key in self.accessor.keys(self.parts):
yield key, self._make_child_relpath(key)

def content(self):
def content(self) -> Any:
"""Return content of the path."""
with self.open() as d:
return d

def get(self, key, default=None):
def get(self, key: Hashable, default: Any = None) -> Any:
"""Return the child path for key if key is in the path, else default."""
if key in self:
return self.__truediv__(key)
return default

def _make_child(self, args):
def _make_child(self, args: List[Any]) -> "AccessorPath":
parts = parse_args(args, self.separator)
parts_joined = self.parts + parts
return self._from_parsed_parts(
return self.__from_parsed_parts(
self.accessor, parts_joined, self.separator
)

def _make_child_relpath(self, part):
def _make_child_relpath(self, part: Hashable) -> "AccessorPath":
# This is an optimization used for dir walking. `part` must be
# a single part relative to this path.
parts = self.parts + [part]
return self._from_parsed_parts(self.accessor, parts, self.separator)
return self.__from_parsed_parts(self.accessor, parts, self.separator)


class LookupPath(AccessorPath):
"""Path for object that supports __getitem__ lookups"""
"""Path for object that supports __getitem__ lookups."""

def __init__(self, lookup, *args, **kwargs):
def __init__(
self, lookup: Mapping[Hashable, Any], *args: Any, **kwargs: Any
):
accessor = LookupAccessor(lookup)
return super().__init__(accessor, *args, **kwargs)
Loading