Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions pathable/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
from typing import Union

from pathable.accessors import NodeAccessor
from pathable.accessors import LookupAccessor
from pathable.accessors import LookupAccessor, N, K, V
from pathable.parsers import SEPARATOR
from pathable.parsers import parse_args
from pathable.types import LookupKey
from pathable.types import LookupNode
from pathable.types import LookupValue

TBasePath = TypeVar("TBasePath", bound="BasePath")
TAccessorPath = TypeVar("TAccessorPath", bound="AccessorPath[Any]")
TNodeAccessor = TypeVar("TNodeAccessor", bound=NodeAccessor[Any, Any, Any])
TLookupPath = TypeVar("TLookupPath", bound="LookupPath")


@dataclass(frozen=True, init=False)
Expand Down Expand Up @@ -118,21 +117,22 @@ def __ge__(self, other: Any) -> bool:
return self._cparts >= other._cparts


class AccessorPath(BasePath, Generic[TNodeAccessor]):
class AccessorPath(BasePath, Generic[N, K, V]):
"""Path for object that can be read by accessor."""
accessor: TNodeAccessor
parts: tuple[K, ...]
accessor: NodeAccessor[N, K, V]

def __init__(self, accessor: TNodeAccessor, *args: Any, separator: Optional[str] = None):
def __init__(self, accessor: NodeAccessor[N, K, V], *args: Any, separator: Optional[str] = None):
object.__setattr__(self, 'accessor', accessor)
super().__init__(*args, separator=separator)

@classmethod
def _from_parsed_parts(
cls: Type[TAccessorPath],
cls: Type["AccessorPath[N, K, V]"],
parts: tuple[Hashable, ...],
separator: Optional[str] = None,
accessor: Union[TNodeAccessor, None] = None,
) -> TAccessorPath:
accessor: Union[NodeAccessor[N, K, V], None] = None,
) -> "AccessorPath[N, K, V]":
if accessor is None:
raise ValueError("accessor must be provided")
instance = cls.__new__(cls)
Expand All @@ -141,43 +141,43 @@ def _from_parsed_parts(
object.__setattr__(instance, 'accessor', accessor)
return instance

def _make_child(self: TAccessorPath, args: list[Any]) -> TAccessorPath:
def _make_child(self: "AccessorPath[N, K, V]", args: list[Any]) -> "AccessorPath[N, K, V]":
parts = parse_args(args, self.separator)
parts_joined = self.parts + parts
return self._from_parsed_parts(
parts_joined, separator=self.separator, accessor=self.accessor,
)

def _make_child_relpath(
self: TAccessorPath, part: Hashable
) -> TAccessorPath:
self: "AccessorPath[N, K, V]", part: Hashable
) -> "AccessorPath[N, K, V]":
# This is an optimization used for dir walking. `part` must be
# a single part relative to this path.
parts = self.parts + (part, )
return self._from_parsed_parts(
parts, separator=self.separator, accessor=self.accessor,
)

def __iter__(self: TAccessorPath) -> Iterator[TAccessorPath]:
def __iter__(self: "AccessorPath[N, K, V]") -> Iterator["AccessorPath[N, K, V]"]:
for key in self.accessor.keys(self.parts):
yield self._make_child_relpath(key)

def __getitem__(self, key: Hashable) -> Any:
def __getitem__(self, key: K) -> Any:
if key not in self:
raise KeyError(key)
path = self / key
return path.read_value()

def __contains__(self, key: Hashable) -> bool:
def __contains__(self, key: K) -> bool:
return key in self.accessor.keys(self.parts)

def __len__(self) -> int:
return self.accessor.len(self.parts)

def keys(self) -> Any:
def keys(self) -> Sequence[K]:
return self.accessor.keys(self.parts)

def getkey(self, key: Hashable, default: Any = None) -> Any:
def getkey(self, key: K, default: Any = None) -> Any:
"""Return the value for key if key is in the path, else default."""
warnings.warn(
"'getkey' method is deprecated. Use 'key not in path' and 'path.read_value' instead.",
Expand All @@ -189,23 +189,23 @@ def getkey(self, key: Hashable, default: Any = None) -> Any:
except KeyError:
return default

def iter(self: TAccessorPath) -> Iterator[TAccessorPath]:
def iter(self: "AccessorPath[N, K, V]") -> Iterator["AccessorPath[N, K, V]"]:
"""Iterate over all child paths."""
warnings.warn(
"'iter' method is deprecated. Use 'iter(path)' instead.",
DeprecationWarning,
)
return iter(self)

def iteritems(self: TAccessorPath) -> Iterator[tuple[Any, TAccessorPath]]:
def iteritems(self: "AccessorPath[N, K, V]") -> Iterator[tuple[Any, "AccessorPath[N, K, V]"]]:
"""Return path's items."""
warnings.warn(
"'iteritems' method is deprecated. Use 'items' instead.",
DeprecationWarning,
)
return self.items()

def items(self: TAccessorPath) -> Iterator[tuple[Any, TAccessorPath]]:
def items(self: "AccessorPath[N, K, V]") -> Iterator[tuple[Any, "AccessorPath[N, K, V]"]]:
"""Return path's items."""
for key in self.accessor.keys(self.parts):
yield key, self._make_child_relpath(key)
Expand All @@ -217,7 +217,7 @@ def content(self) -> Any:
)
return self.read_value()

def get(self, key: Hashable, default: Any = None) -> Any:
def get(self, key: K, default: Any = None) -> Any:
"""Return the child path for key if key is in the path, else default."""
warnings.warn(
"'get' method is deprecated. Use 'key in path' and 'path / key' instead.",
Expand All @@ -243,7 +243,7 @@ def read_value(self) -> Any:
return self.accessor.read(self.parts)


class LookupPath(AccessorPath[LookupAccessor]):
class LookupPath(AccessorPath[LookupNode, LookupKey, LookupValue]):
"""Path for object that supports __getitem__ lookups."""

@classmethod
Expand Down
Loading