Skip to content

Commit

Permalink
Merge pull request pytest-dev#345 from bluetech/hooks1
Browse files Browse the repository at this point in the history
Misc small improvements
  • Loading branch information
bluetech authored Jan 13, 2022
2 parents 4576615 + a489e0c commit 2b9998a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 62 deletions.
123 changes: 87 additions & 36 deletions src/pluggy/_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

if TYPE_CHECKING:
from typing_extensions import TypedDict
from typing_extensions import Final


_T = TypeVar("_T")
Expand Down Expand Up @@ -58,8 +59,10 @@ class HookspecMarker:
if the :py:class:`.PluginManager` uses the same project_name.
"""

__slots__ = ("project_name",)

def __init__(self, project_name: str) -> None:
self.project_name = project_name
self.project_name: "Final" = project_name

@overload
def __call__(
Expand Down Expand Up @@ -127,8 +130,10 @@ class HookimplMarker:
if the :py:class:`.PluginManager` uses the same project_name.
"""

__slots__ = ("project_name",)

def __init__(self, project_name: str) -> None:
self.project_name = project_name
self.project_name: "Final" = project_name

@overload
def __call__(
Expand Down Expand Up @@ -263,31 +268,41 @@ def varnames(func: object) -> Tuple[Tuple[str, ...], Tuple[str, ...]]:

class _HookRelay:
"""hook holder object for performing 1:N hook calls where N is the number
of registered plugins.
of registered plugins."""

"""
__slots__ = ("__dict__",)

if TYPE_CHECKING:

def __getattr__(self, name: str) -> "_HookCaller":
...


_CallHistory = List[Tuple[Mapping[str, object], Optional[Callable[[Any], None]]]]


class _HookCaller:
__slots__ = (
"name",
"spec",
"_hookexec",
"_wrappers",
"_nonwrappers",
"_call_history",
)

def __init__(
self,
name: str,
hook_execute: _HookExec,
specmodule_or_class: Optional[_Namespace] = None,
spec_opts: Optional["_HookSpecOpts"] = None,
) -> None:
self.name = name
self._wrappers: List[HookImpl] = []
self._nonwrappers: List[HookImpl] = []
self._hookexec = hook_execute
self._call_history: Optional[
List[Tuple[Mapping[str, object], Optional[Callable[[Any], None]]]]
] = None
self.name: "Final" = name
self._hookexec: "Final" = hook_execute
self._wrappers: "Final[List[HookImpl]]" = []
self._nonwrappers: "Final[List[HookImpl]]" = []
self._call_history: Optional[_CallHistory] = None
self.spec: Optional[HookSpec] = None
if specmodule_or_class is not None:
assert spec_opts is not None
Expand Down Expand Up @@ -346,27 +361,32 @@ def _add_hookimpl(self, hookimpl: "HookImpl") -> None:
def __repr__(self) -> str:
return f"<_HookCaller {self.name!r}>"

def __call__(self, *args: object, **kwargs: object) -> Any:
if args:
raise TypeError("hook calling supports only keyword arguments")
assert not self.is_historic()

def _verify_all_args_are_provided(self, kwargs: Mapping[str, object]) -> None:
# This is written to avoid expensive operations when not needed.
if self.spec:
for argname in self.spec.argnames:
if argname not in kwargs:
notincall = tuple(set(self.spec.argnames) - kwargs.keys())
notincall = ", ".join(
repr(argname)
for argname in self.spec.argnames
# Avoid self.spec.argnames - kwargs.keys() - doesn't preserve order.
if argname not in kwargs.keys()
)
warnings.warn(
"Argument(s) {} which are declared in the hookspec "
"can not be found in this hook call".format(notincall),
"cannot be found in this hook call".format(notincall),
stacklevel=2,
)
break

firstresult = self.spec.opts.get("firstresult", False)
else:
firstresult = False

def __call__(self, *args: object, **kwargs: object) -> Any:
if args:
raise TypeError("hook calling supports only keyword arguments")
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)

def call_historic(
Expand All @@ -382,6 +402,7 @@ def call_historic(
"""
assert self._call_history is not None
kwargs = kwargs or {}
self._verify_all_args_are_provided(kwargs)
self._call_history.append((kwargs, result_callback))
# Historizing hooks don't return results.
# Remember firstresult isn't compatible with historic.
Expand All @@ -397,21 +418,28 @@ def call_extra(
) -> Any:
"""Call the hook with some additional temporarily participating
methods using the specified ``kwargs`` as call parameters."""
old = list(self._nonwrappers), list(self._wrappers)
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
opts: "_HookImplOpts" = {
"hookwrapper": False,
"optionalhook": False,
"trylast": False,
"tryfirst": False,
"specname": None,
}
hookimpls = self.get_hookimpls()
for method in methods:
opts: "_HookImplOpts" = {
"hookwrapper": False,
"optionalhook": False,
"trylast": False,
"tryfirst": False,
"specname": None,
}
hookimpl = HookImpl(None, "<temp>", method, opts)
self._add_hookimpl(hookimpl)
try:
return self(**kwargs)
finally:
self._nonwrappers, self._wrappers = old
# Find last non-tryfirst nonwrapper method.
i = len(hookimpls) - 1
until = len(self._nonwrappers)
while i >= until and hookimpls[i].tryfirst:
i -= 1
hookimpls.insert(i + 1, hookimpl)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
return self._hookexec(self.name, hookimpls, kwargs, firstresult)

def _maybe_apply_history(self, method: "HookImpl") -> None:
"""Apply call history to a new hookimpl if it is marked as historic."""
Expand All @@ -426,14 +454,27 @@ def _maybe_apply_history(self, method: "HookImpl") -> None:


class HookImpl:
__slots__ = (
"function",
"argnames",
"kwargnames",
"plugin",
"opts",
"plugin_name",
"hookwrapper",
"optionalhook",
"tryfirst",
"trylast",
)

def __init__(
self,
plugin: _Plugin,
plugin_name: str,
function: _HookImplFunction[object],
hook_impl_opts: "_HookImplOpts",
) -> None:
self.function = function
self.function: "Final" = function
self.argnames, self.kwargnames = varnames(self.function)
self.plugin = plugin
self.opts = hook_impl_opts
Expand All @@ -448,6 +489,16 @@ def __repr__(self) -> str:


class HookSpec:
__slots__ = (
"namespace",
"function",
"name",
"argnames",
"kwargnames",
"opts",
"warn_on_impl",
)

def __init__(self, namespace: _Namespace, name: str, opts: "_HookSpecOpts") -> None:
self.namespace = namespace
self.function: Callable[..., object] = getattr(namespace, name)
Expand Down
24 changes: 18 additions & 6 deletions src/pluggy/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import importlib_metadata

if TYPE_CHECKING:
from typing_extensions import Final

from ._hooks import _HookImplOpts, _HookSpecOpts

_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None]
Expand Down Expand Up @@ -99,13 +101,23 @@ class PluginManager:
which will subsequently send debug information to the trace helper.
"""

__slots__ = (
"project_name",
"_name2plugin",
"_plugin2hookcallers",
"_plugin_distinfo",
"trace",
"hook",
"_inner_hookexec",
)

def __init__(self, project_name: str) -> None:
self.project_name = project_name
self._name2plugin: Dict[str, _Plugin] = {}
self._plugin2hookcallers: Dict[_Plugin, List[_HookCaller]] = {}
self._plugin_distinfo: List[Tuple[_Plugin, DistFacade]] = []
self.trace = _tracing.TagTracer().get("pluginmanage")
self.hook = _HookRelay()
self.project_name: "Final" = project_name
self._name2plugin: "Final[Dict[str, _Plugin]]" = {}
self._plugin2hookcallers: "Final[Dict[_Plugin, List[_HookCaller]]]" = {}
self._plugin_distinfo: "Final[List[Tuple[_Plugin, DistFacade]]]" = []
self.trace: "Final" = _tracing.TagTracer().get("pluginmanage")
self.hook: "Final" = _HookRelay()
self._inner_hookexec = _multicall

def _hookexec(
Expand Down
2 changes: 2 additions & 0 deletions src/pluggy/_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class HookCallError(Exception):


class _Result(Generic[_T]):
__slots__ = ("_result", "_excinfo")

def __init__(self, result: Optional[_T], excinfo: Optional[_ExcInfo]) -> None:
self._result = result
self._excinfo = excinfo
Expand Down
36 changes: 16 additions & 20 deletions testing/test_details.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
import pytest
from pluggy import PluginManager, HookimplMarker, HookspecMarker

Expand Down Expand Up @@ -89,36 +88,33 @@ class Module:
assert pm.get_plugin("donttouch") is module


def test_warning_on_call_vs_hookspec_arg_mismatch() -> None:
"""Verify that is a hook is called with less arguments then defined in the
spec that a warning is emitted.
"""
def test_not_all_arguments_are_provided_issues_a_warning(pm: PluginManager) -> None:
"""Calling a hook without providing all arguments specified in
the hook spec issues a warning."""

class Spec:
@hookspec
def myhook(self, arg1, arg2):
def hello(self, arg1, arg2):
pass

class Plugin:
@hookimpl
def myhook(self, arg1):
@hookspec(historic=True)
def herstory(self, arg1, arg2):
pass

pm = PluginManager(hookspec.project_name)
pm.register(Plugin())
pm.add_hookspecs(Spec)

with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
with pytest.warns(UserWarning, match=r"'arg1', 'arg2'.*cannot be found.*$"):
pm.hook.hello()
with pytest.warns(UserWarning, match=r"'arg2'.*cannot be found.*$"):
pm.hook.hello(arg1=1)
with pytest.warns(UserWarning, match=r"'arg1'.*cannot be found.*$"):
pm.hook.hello(arg2=2)

# calling should trigger a warning
pm.hook.myhook(arg1=1)
with pytest.warns(UserWarning, match=r"'arg1', 'arg2'.*cannot be found.*$"):
pm.hook.hello.call_extra([], kwargs=dict())

assert warns is not None
assert len(warns) == 1
warning = warns[-1]
assert issubclass(warning.category, Warning)
assert "Argument(s) ('arg2',)" in str(warning.message)
with pytest.warns(UserWarning, match=r"'arg1', 'arg2'.*cannot be found.*$"):
pm.hook.herstory.call_historic(kwargs=dict())


def test_repr() -> None:
Expand Down

0 comments on commit 2b9998a

Please sign in to comment.