Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flake8_async/visitors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
visitor111,
visitor118,
visitor123,
visitor430,
visitor_utility,
visitors,
)
84 changes: 84 additions & 0 deletions flake8_async/visitors/visitor430.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Visitor to check for pytest.raises(ExceptionGroup) usage.

ASYNC430: Suggests using pytest.RaisesGroup instead of pytest.raises(ExceptionGroup).
"""

from __future__ import annotations

import ast
from typing import TYPE_CHECKING, Any

from .flake8asyncvisitor import Flake8AsyncVisitor
from .helpers import error_class

if TYPE_CHECKING:
from collections.abc import Mapping


@error_class
class Visitor430(Flake8AsyncVisitor):
error_codes: Mapping[str, str] = {
"ASYNC430": (
"Using `pytest.raises(ExceptionGroup)` is discouraged, consider using "
"`pytest.RaisesGroup` instead."
)
}

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.imports_pytest: bool = False
self.imports_exceptiongroup: bool = False
self.async_function = False

def visit_AsyncFunctionDef(
self, node: ast.AsyncFunctionDef | ast.FunctionDef | ast.Lambda
):
self.save_state(node, "async_function")
self.async_function = isinstance(node, ast.AsyncFunctionDef)

visit_FunctionDef = visit_AsyncFunctionDef
visit_Lambda = visit_AsyncFunctionDef

def visit_Import(self, node: ast.Import) -> None:
for alias in node.names:
if alias.name == "pytest":
self.imports_pytest = True

def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module == "pytest":
self.imports_pytest = True
elif node.module == "builtins" or node.module is None:
# Check for `from builtins import ExceptionGroup`
for alias in node.names:
if alias.name in ("ExceptionGroup", "BaseExceptionGroup"):
self.imports_exceptiongroup = True

def visit_Call(self, node: ast.Call) -> None:
if not self.async_function:
return

func_name = ast.unparse(node.func)

# Check for pytest.raises(ExceptionGroup) or pytest.raises(BaseExceptionGroup)
if not (
func_name == "pytest.raises"
or (self.imports_pytest and func_name == "raises")
):
return

# Check first argument (exception type)
if not node.args:
return

first_arg = node.args[0]
if isinstance(first_arg, ast.Name) and first_arg.id in (
"ExceptionGroup",
"BaseExceptionGroup",
):
self.error(node)
elif isinstance(first_arg, ast.Attribute) and first_arg.attr in (
"ExceptionGroup",
"BaseExceptionGroup",
):
# Handle pytest.raises(pytest.ExceptionGroup) or similar
self.error(node)
24 changes: 24 additions & 0 deletions tests/eval_files/async430.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# type: ignore
import pytest


async def test_pytest_raises_exceptiongroup():
with pytest.raises(ExceptionGroup): # ASYNC430: 9
pass


async def test_pytest_raises_baseexceptiongroup():
with pytest.raises(BaseExceptionGroup): # ASYNC430: 9
pass


async def test_pytest_raises_other():
# Should not error
with pytest.raises(ValueError):
pass


async def test_pytest_raises_group():
# Should not error - this is what we want users to use
with pytest.RaisesGroup(ExceptionGroup):
pass
Loading