Skip to content

Commit

Permalink
0.8.0 (#39)
Browse files Browse the repository at this point in the history
* 🐛 Patch classes if they have piping operator method

* ✨ Auto-register numpy ufuncs

* Fix linting

* pump executing to 1.1.1 to fix pwwang/datar#149

* 0.8.0
  • Loading branch information
pwwang authored Oct 8, 2022
1 parent e77503b commit d9cd3d5
Show file tree
Hide file tree
Showing 13 changed files with 563 additions and 89 deletions.
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change Log

## 0.8.0

- patch classes if they have piping operator method
- auto register numpy ufuncs
- pump executing to 1.1.1 to fix pwwang/datar#149

## 0.7.6

- 🐛 Fix `numpy.ndarray` as data argument for verbs
Expand Down
4 changes: 2 additions & 2 deletions pipda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from .verb import (
Verb,
VerbCall,
register_piping,
register_verb,
)
from .piping import register_piping

__version__ = "0.7.6"
__version__ = "0.8.0"
33 changes: 31 additions & 2 deletions pipda/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

from abc import ABC, abstractmethod
from functools import partialmethod
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Callable

from .context import ContextBase

if TYPE_CHECKING:
from .operator import OperatorCall
from .function import FunctionCall
from .reference import ReferenceAttr, ReferenceItem

OPERATORS = {
Expand Down Expand Up @@ -55,7 +56,35 @@ class Expression(ABC):
"""The abstract Expression class"""

_pipda_operator = None
__array_ufunc__ = None

def __array_ufunc__(
self,
ufunc: Callable,
method: str,
*inputs: Any,
**kwargs: Any,
) -> FunctionCall:
"""Allow numpy ufunc to work on Expression objects"""

from .piping import PIPING_OPS
from .verb import VerbCall

if (
ufunc.__name__ == PIPING_OPS[VerbCall.PIPING][2]
and isinstance(inputs[1], VerbCall)
and len(inputs) == 2
and method == "__call__"
):
# We can't patch numpy.ndarray
return inputs[1]._pipda_eval(inputs[0])

from .function import Function, FunctionCall

if method == "reduce":
ufunc = ufunc.reduce

fun = Function(ufunc, None, {})
return FunctionCall(fun, *inputs, **kwargs)

def __hash__(self) -> int:
"""Make it hashable"""
Expand Down
26 changes: 19 additions & 7 deletions pipda/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,27 @@ def _pipda_eval(self, data: Any, context: ContextType = None) -> Any:
},
)

bound = func.bind_arguments(*self._pipda_args, **self._pipda_kwargs)
context = func.contexts["_"] or context
extra_contexts = func.extra_contexts
for key, val in bound.arguments.items():
ctx = extra_contexts["_"].get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val
extra_contexts = func.extra_contexts["_"]

return func.func(*bound.args, **bound.kwargs)
if extra_contexts:
bound = func.bind_arguments(*self._pipda_args, **self._pipda_kwargs)

for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func.func(*bound.args, **bound.kwargs)

# we don't need signature if there is no extra context
return func.func(
*(evaluate_expr(arg, data, context) for arg in self._pipda_args),
**{
key: evaluate_expr(val, data, context)
for key, val in self._pipda_kwargs.items()
},
)


class Registered(ABC):
Expand Down
177 changes: 177 additions & 0 deletions pipda/piping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import ast
import functools
from typing import Type, Dict, Callable

from .verb import VerbCall

PIPING_OPS = {
# op: (method, ast node, numpy ufunc name)
">>": ("__rrshift__", ast.RShift, "right_shift"),
"|": ("__ror__", ast.BitOr, "bitwise_or"),
"//": ("__rfloordiv__", ast.FloorDiv, "floor_divide"),
"@": ("__rmatmul__", ast.MatMult, "matmul"),
"%": ("__rmod__", ast.Mod, "remainder"),
"&": ("__rand__", ast.BitAnd, "bitwise_and"),
"^": ("__rxor__", ast.BitXor, "bitwise_xor"),
}

PATCHED_CLASSES: Dict[Type, Dict[str, Callable]] = {
# kls:
# {} # registered but not patched
# {"method": <method>, "imethod": <imethod>} # patched
}


def _patch_cls_method(kls: Type, method: str) -> None:
"""Borrowed from https://github.com/sspipe/sspipe"""
try:
original = getattr(kls, method)
except AttributeError:
return

PATCHED_CLASSES[kls][method] = original

@functools.wraps(original)
def wrapper(self, x, *args, **kwargs):
if isinstance(x, VerbCall):
return NotImplemented
return original(self, x, *args, **kwargs)

setattr(kls, method, wrapper)


def _unpatch_cls_method(kls: Type, method: str) -> None:
if method in PATCHED_CLASSES[kls]:
setattr(kls, method, PATCHED_CLASSES[kls].pop(method))


def _patch_cls_operator(kls: Type, op: str) -> None:
method = PIPING_OPS[op][0].replace("__r", "__")
imethod = PIPING_OPS[op][0].replace("__r", "__i")
_patch_cls_method(kls, method)
_patch_cls_method(kls, imethod)


def _unpatch_cls_operator(kls: Type, op: str) -> None:
method = PIPING_OPS[op][0].replace("__r", "__")
imethod = PIPING_OPS[op][0].replace("__r", "__i")
_unpatch_cls_method(kls, method)
_unpatch_cls_method(kls, imethod)


def patch_classes(*classes: Type) -> None:
"""Patch the classes in case it has piping operator defined
For example, DataFrame.__or__ has already been defined, so we need to
patch it to force it to use __ror__ of VerbCall if `|` is registered
for piping.
Args:
classes: The classes to patch
"""
for kls in classes:
if kls not in PATCHED_CLASSES:
PATCHED_CLASSES[kls] = {}

if not PATCHED_CLASSES[kls]:
_patch_cls_operator(kls, VerbCall.PIPING)


def unpatch_classes(*classes: Type) -> None:
"""Unpatch the classes
Args:
classes: The classes to unpatch
"""
for kls in classes:
if PATCHED_CLASSES[kls]:
_unpatch_cls_operator(kls, VerbCall.PIPING)
# Don't patch it in the future
del PATCHED_CLASSES[kls]


def _patch_all(op: str) -> None:
"""Patch all registered classes that has the operator defined
Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
un: Unpatch the classes
"""
for kls in PATCHED_CLASSES:
_patch_cls_operator(kls, op)


def _unpatch_all(op: str) -> None:
"""Unpatch all registered classes
Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
for kls in PATCHED_CLASSES:
_unpatch_cls_operator(kls, op)


def _patch_default_classes() -> None:
"""Patch the default/commonly used classes"""

try:
import pandas
patch_classes(
pandas.DataFrame,
pandas.Series,
pandas.Index,
pandas.Categorical,
)
except ImportError:
pass

try: # pragma: no cover
from modin import pandas
patch_classes(
pandas.DataFrame,
pandas.Series,
pandas.Index,
pandas.Categorical,
)
except ImportError:
pass

try: # pragma: no cover
import torch
patch_classes(torch.Tensor)
except ImportError:
pass

try: # pragma: no cover
from django.db.models import query
patch_classes(query.QuerySet)
except ImportError:
pass


def register_piping(op: str) -> None:
"""Register the piping operator for verbs
Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
if op not in PIPING_OPS:
raise ValueError(f"Unsupported piping operator: {op}")

if VerbCall.PIPING:
orig_method = VerbCall.__orig_opmethod__
curr_method = PIPING_OPS[VerbCall.PIPING][0]
setattr(VerbCall, curr_method, orig_method)
_unpatch_all(VerbCall.PIPING)

VerbCall.PIPING = op
VerbCall.__orig_opmethod__ = getattr(VerbCall, PIPING_OPS[op][0])
setattr(VerbCall, PIPING_OPS[op][0], VerbCall._pipda_eval)
_patch_all(op)


register_piping(">>")
_patch_default_classes()
3 changes: 2 additions & 1 deletion pipda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def is_piping_verbcall(verb: str, fallback: str) -> bool:
True if it is a piping verb call, otherwise False
"""
from executing import Source
from .verb import PIPING_OPS, VerbCall
from .verb import VerbCall
from .piping import PIPING_OPS

frame = sys._getframe(2)
node = Source.executing(frame).node
Expand Down
61 changes: 20 additions & 41 deletions pipda/verb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Provide verb definition"""
from __future__ import annotations

import ast
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -29,16 +28,6 @@
from inspect import Signature
from .context import ContextType

PIPING_OPS = {
">>": ("__rrshift__", ast.RShift),
"|": ("__ror__", ast.BitOr),
"//": ("__rfloordiv__", ast.FloorDiv),
"@": ("__rmatmul__", ast.MatMult),
"%": ("__rmod__", ast.Mod),
"&": ("__rand__", ast.BitAnd),
"^": ("__rxor__", ast.BitXor),
}


class VerbCall(Expression):
"""A verb call
Expand Down Expand Up @@ -91,17 +80,28 @@ def _pipda_eval(self, data: Any, context: ContextType = None) -> Any:
self._pipda_func.extra_contexts.get(func, None)
or self._pipda_func.extra_contexts["_"]
)
bound = self._pipda_func.bind_arguments(
if extra_contexts:
bound = self._pipda_func.bind_arguments(
data,
*self._pipda_args,
**self._pipda_kwargs,
)
for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func(*bound.args, **bound.kwargs)

# we don't need signature if there is no extra context
return func(
data,
*self._pipda_args,
**self._pipda_kwargs,
*(evaluate_expr(arg, data, context) for arg in self._pipda_args),
**{
key: evaluate_expr(val, data, context)
for key, val in self._pipda_kwargs.items()
},
)
for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func(*bound.args, **bound.kwargs)


class Verb(Registered):
Expand Down Expand Up @@ -298,24 +298,3 @@ def register_verb(
dep=dep,
ast_fallback=ast_fallback,
)


def register_piping(op: str) -> None:
"""Register the piping operator for verbs
Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
if op not in PIPING_OPS:
raise ValueError(f"Unsupported piping operator: {op}")

if VerbCall.PIPING:
curr_method = PIPING_OPS[VerbCall.PIPING][0]
delattr(VerbCall, curr_method)

VerbCall.PIPING = op
setattr(VerbCall, PIPING_OPS[op][0], VerbCall._pipda_eval)


register_piping(">>")
Loading

0 comments on commit d9cd3d5

Please sign in to comment.