Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions src/mutmut/file_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,46 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation
mutated_module = module.with_changes(body=result)
return mutated_module.code, mutation_names

def _contains_yield(node: cst.CSTNode) -> bool:
"""Recursively check if a node contains a Yield, but don't recurse into nested functions/lambdas."""
if isinstance(node, cst.Yield):
return True
if isinstance(node, (cst.FunctionDef, cst.Lambda)):
# Don't recurse into nested functions or lambdas
return False

# Check all children
for child in node.children:
if _contains_yield(child):
return True
return False


def is_generator_function(function: cst.FunctionDef) -> bool:
"""Check if a function is a generator (contains yield or yield from).

Note: This checks for Yield nodes in the function body, but doesn't
recurse into nested functions or lambdas since Yield in those would
make them generators, not the outer function.
"""
return _contains_yield(function.body)


def function_trampoline_arrangement(function: cst.FunctionDef, mutants: Iterable[Mutation], class_name: Union[str, None]) -> tuple[Sequence[MODULE_STATEMENT], Sequence[str]]:
"""Create mutated functions and a trampoline that switches between original and mutated versions.

:return: A tuple of (nodes, mutant names)"""
nodes: list[MODULE_STATEMENT] = []
mutant_names: list[str] = []

name = function.name.value
mangled_name = mangle_function_name(name=name, class_name=class_name) + '__mutmut'

# Detect if the function is async and/or a generator
is_async = function.asynchronous is not None
is_generator = is_generator_function(function)
is_async_generator = is_async and is_generator

# copy of original function
nodes.append(function.with_changes(name=cst.Name(mangled_name + '_orig')))

Expand All @@ -248,7 +278,13 @@ def function_trampoline_arrangement(function: cst.FunctionDef, mutants: Iterable
nodes.append(mutated_method) # type: ignore

# trampoline that forwards the calls
trampoline = list(cst.parse_module(build_trampoline(orig_name=name, mutants=mutant_names, class_name=class_name)).body)
trampoline = list(cst.parse_module(build_trampoline(
orig_name=name,
mutants=mutant_names,
class_name=class_name,
is_async=is_async,
is_async_generator=is_async_generator
)).body)
trampoline[0] = trampoline[0].with_changes(leading_lines=[cst.EmptyLine()])
nodes.extend(trampoline)

Expand Down
62 changes: 55 additions & 7 deletions src/mutmut/trampoline_templates.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CLASS_NAME_SEPARATOR = 'ǁ'

def build_trampoline(*, orig_name, mutants, class_name):
def build_trampoline(*, orig_name, mutants, class_name, is_async=False, is_async_generator=False):
mangled_name = mangle_function_name(name=orig_name, class_name=class_name)

mutants_dict = f'{mangled_name}__mutmut_mutants : ClassVar[MutantDict] = {{\n' + ', \n '.join(f'{repr(m)}: {m}' for m in mutants) + '\n}'
Expand All @@ -12,14 +12,37 @@ def build_trampoline(*, orig_name, mutants, class_name):
access_suffix = '")'
self_arg = ', self'

trampoline_name = '_mutmut_trampoline'
# Use appropriate trampoline based on function type
if is_async_generator:
# Async generators need a wrapper that yields from the original
# This ensures asyncio.isasyncgenfunction() returns True
return f"""
{mutants_dict}

async def {orig_name}({'self, ' if class_name is not None else ''}*args, **kwargs):
gen = _mutmut_trampoline({access_prefix}{mangled_name}__mutmut_orig{access_suffix}, {access_prefix}{mangled_name}__mutmut_mutants{access_suffix}, args, kwargs{self_arg})
async for item in gen:
yield item

{orig_name}.__signature__ = _mutmut_signature({mangled_name}__mutmut_orig)
{mangled_name}__mutmut_orig.__name__ = '{mangled_name}'
"""
elif is_async:
# Regular async functions need async def and should await the result
trampoline_name = '_mutmut_trampoline_async'
async_keyword = 'async '
await_keyword = 'await '
else:
trampoline_name = '_mutmut_trampoline'
async_keyword = ''
await_keyword = ''

return f"""
{mutants_dict}

def {orig_name}({'self, ' if class_name is not None else ''}*args, **kwargs):
result = {trampoline_name}({access_prefix}{mangled_name}__mutmut_orig{access_suffix}, {access_prefix}{mangled_name}__mutmut_mutants{access_suffix}, args, kwargs{self_arg})
return result
{async_keyword}def {orig_name}({'self, ' if class_name is not None else ''}*args, **kwargs):
result = {await_keyword}{trampoline_name}({access_prefix}{mangled_name}__mutmut_orig{access_suffix}, {access_prefix}{mangled_name}__mutmut_mutants{access_suffix}, args, kwargs{self_arg})
return result

{orig_name}.__signature__ = _mutmut_signature({mangled_name}__mutmut_orig)
{mangled_name}__mutmut_orig.__name__ = '{mangled_name}'
Expand Down Expand Up @@ -47,12 +70,12 @@ def mangle_function_name(*, name, class_name):


def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None):
\"""Forward call to original or mutated function, depending on the environment\"""
\"\"\"Forward call to original or mutated function, depending on the environment\"\"\"
import os
mutant_under_test = os.environ['MUTANT_UNDER_TEST']
if mutant_under_test == 'fail':
from mutmut.__main__ import MutmutProgrammaticFailException
raise MutmutProgrammaticFailException('Failed programmatically')
raise MutmutProgrammaticFailException('Failed programmatically')
elif mutant_under_test == 'stats':
from mutmut.__main__ import record_trampoline_hit
record_trampoline_hit(orig.__module__ + '.' + orig.__name__)
Expand All @@ -70,4 +93,29 @@ def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None):
result = mutants[mutant_name](*call_args, **call_kwargs)
return result


async def _mutmut_trampoline_async(orig, mutants, call_args, call_kwargs, self_arg = None):
\"\"\"Forward call to original or mutated async function, depending on the environment\"\"\"
import os
mutant_under_test = os.environ['MUTANT_UNDER_TEST']
if mutant_under_test == 'fail':
from mutmut.__main__ import MutmutProgrammaticFailException
raise MutmutProgrammaticFailException('Failed programmatically')
elif mutant_under_test == 'stats':
from mutmut.__main__ import record_trampoline_hit
record_trampoline_hit(orig.__module__ + '.' + orig.__name__)
result = await orig(*call_args, **call_kwargs)
return result
prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_'
if not mutant_under_test.startswith(prefix):
result = await orig(*call_args, **call_kwargs)
return result
mutant_name = mutant_under_test.rpartition('.')[-1]
if self_arg is not None:
# call to a class method where self is not bound
result = await mutants[mutant_name](self_arg, *call_args, **call_kwargs)
else:
result = await mutants[mutant_name](*call_args, **call_kwargs)
return result

"""
29 changes: 18 additions & 11 deletions tests/test_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,11 @@ def inner():
assert mutants == [expected]


def normalize_whitespace(s: str) -> str:
"""Normalize whitespace by stripping trailing spaces from each line."""
return '\n'.join(line.rstrip() for line in s.split('\n'))


def test_module_mutation():
source = """from __future__ import division
import lib
Expand All @@ -679,7 +684,7 @@ def add(self, value):

src, _ = mutate_file_contents("file.py", source)

assert src == f"""from __future__ import division
expected = f"""from __future__ import division
import lib

lib.foo()
Expand All @@ -697,7 +702,7 @@ def x_foo__mutmut_1(a, b):

def foo(*args, **kwargs):
result = _mutmut_trampoline(x_foo__mutmut_orig, x_foo__mutmut_mutants, args, kwargs)
return result
return result

foo.__signature__ = _mutmut_signature(x_foo__mutmut_orig)
x_foo__mutmut_orig.__name__ = 'x_foo'
Expand All @@ -714,7 +719,7 @@ def x_bar__mutmut_1():

def bar(*args, **kwargs):
result = _mutmut_trampoline(x_bar__mutmut_orig, x_bar__mutmut_mutants, args, kwargs)
return result
return result

bar.__signature__ = _mutmut_signature(x_bar__mutmut_orig)
x_bar__mutmut_orig.__name__ = 'x_bar'
Expand All @@ -724,15 +729,15 @@ def xǁAdderǁ__init____mutmut_orig(self, amount):
self.amount = amount
def xǁAdderǁ__init____mutmut_1(self, amount):
self.amount = None

xǁAdderǁ__init____mutmut_mutants : ClassVar[MutantDict] = {{
'xǁAdderǁ__init____mutmut_1': xǁAdderǁ__init____mutmut_1
}}

def __init__(self, *args, **kwargs):
result = _mutmut_trampoline(object.__getattribute__(self, "xǁAdderǁ__init____mutmut_orig"), object.__getattribute__(self, "xǁAdderǁ__init____mutmut_mutants"), args, kwargs, self)
return result
return result

__init__.__signature__ = _mutmut_signature(xǁAdderǁ__init____mutmut_orig)
xǁAdderǁ__init____mutmut_orig.__name__ = 'xǁAdderǁ__init__'

Expand All @@ -741,16 +746,18 @@ def xǁAdderǁadd__mutmut_orig(self, value):

def xǁAdderǁadd__mutmut_1(self, value):
return self.amount - value

xǁAdderǁadd__mutmut_mutants : ClassVar[MutantDict] = {{
'xǁAdderǁadd__mutmut_1': xǁAdderǁadd__mutmut_1
}}

def add(self, *args, **kwargs):
result = _mutmut_trampoline(object.__getattribute__(self, "xǁAdderǁadd__mutmut_orig"), object.__getattribute__(self, "xǁAdderǁadd__mutmut_mutants"), args, kwargs, self)
return result
return result

add.__signature__ = _mutmut_signature(xǁAdderǁadd__mutmut_orig)
xǁAdderǁadd__mutmut_orig.__name__ = 'xǁAdderǁadd'

print(Adder(1).add(2))"""

assert normalize_whitespace(src) == normalize_whitespace(expected)
4 changes: 2 additions & 2 deletions tests/test_mutmut3.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def x_foo__mutmut_2(a, b, c):

def foo(*args, **kwargs):
result = _mutmut_trampoline(x_foo__mutmut_orig, x_foo__mutmut_mutants, args, kwargs)
return result
return result

foo.__signature__ = _mutmut_signature(x_foo__mutmut_orig)
x_foo__mutmut_orig.__name__ = 'x_foo'
Expand Down Expand Up @@ -64,7 +64,7 @@ def x_foo__mutmut_1(a: List[int]) -> int:

def foo(*args, **kwargs):
result = _mutmut_trampoline(x_foo__mutmut_orig, x_foo__mutmut_mutants, args, kwargs)
return result
return result

foo.__signature__ = _mutmut_signature(x_foo__mutmut_orig)
x_foo__mutmut_orig.__name__ = 'x_foo'
Expand Down