Skip to content

Commit

Permalink
Infinite timeout when reading stdout with callback (#169)
Browse files Browse the repository at this point in the history
* initial implementation

* renaming

* fix AsyncFakePopen::_finalize

* changelog entry

---------

Co-authored-by: Michal Poddubiuk <mpoddubi@akamai.com>
  • Loading branch information
meeshal and Michal Poddubiuk authored Oct 2, 2024
1 parent 8a7d9d0 commit 570fbaf
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 27 deletions.
6 changes: 6 additions & 0 deletions changelog.d/bug.75e40e8c.entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message: Get rid of using thread in AsyncFakePopen as it causes thread.join() to hang
indefinitely.
pr_ids:
- '169'
timestamp: 1727847419
type: bug
72 changes: 54 additions & 18 deletions pytest_subprocess/fake_popen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import collections.abc
import concurrent.futures
import io
import os
import signal
Expand Down Expand Up @@ -69,14 +70,14 @@ def __init__(
self.args = command
self.__stdout: OPTIONAL_TEXT_OR_ITERABLE = stdout
self.__stderr: OPTIONAL_TEXT_OR_ITERABLE = stderr
self.__returncode: Optional[int] = returncode
self.__wait: Optional[float] = wait
self.__thread: Optional[Thread] = None
self.__callback: Optional[Optional[Callable]] = callback
self.__callback_kwargs: Optional[Dict[str, AnyType]] = callback_kwargs
self.__signal_callback: Optional[Callable] = signal_callback
self.__stdin_callable: Optional[Optional[Callable]] = stdin_callable
self._signals: List[int] = []
self._returncode: Optional[int] = returncode
self._wait_timeout: Optional[float] = wait
self._callback: Optional[Optional[Callable]] = callback
self._callback_kwargs: Optional[Dict[str, AnyType]] = callback_kwargs

def __enter__(self) -> "FakePopen":
return self
Expand Down Expand Up @@ -116,8 +117,8 @@ def _finalize_thread(self, timeout: Optional[float]) -> None:
if self.__thread is None:
return
self.__thread.join(timeout)
if self.returncode is None and self.__returncode is not None:
self.returncode = self.__returncode
if self.returncode is None and self._returncode is not None:
self.returncode = self._returncode
if self.__thread.exception:
raise self.__thread.exception

Expand All @@ -133,8 +134,8 @@ def poll(self) -> Optional[int]:
return self.returncode

def wait(self, timeout: Optional[float] = None) -> int:
if timeout and self.__wait and timeout < self.__wait:
self.__wait -= timeout
if timeout and self._wait_timeout and timeout < self._wait_timeout:
self._wait_timeout -= timeout
raise subprocess.TimeoutExpired(self.args, timeout)
self._finalize_thread(timeout)
if self.returncode is None:
Expand Down Expand Up @@ -285,21 +286,21 @@ def _wait(self, wait_period: float) -> None:

def run_thread(self) -> None:
"""Run the user-defined callback or wait in a thread."""
if self.__wait is None and self.__callback is None:
if self._wait_timeout is None and self._callback is None:
self._finish_process()
else:
if self.__callback:
if self._callback:
self.__thread = Thread(
target=self.__callback,
target=self._callback,
args=(self,),
kwargs=self.__callback_kwargs or {},
kwargs=self._callback_kwargs or {},
)
else:
self.__thread = Thread(target=self._wait, args=(self.__wait,))
self.__thread = Thread(target=self._wait, args=(self._wait_timeout,))
self.__thread.start()

def _finish_process(self) -> None:
self.returncode = self.__returncode
self.returncode = self._returncode

self._finalize_streams()

Expand Down Expand Up @@ -335,16 +336,20 @@ async def communicate( # type: ignore

# feed eof one more time as streams were opened
self._finalize_streams()

self._finalize_thread(timeout)

await self._finalize(timeout)
return (
await self.stdout.read() if self.stdout else None,
await self.stderr.read() if self.stderr else None,
)

async def wait(self, timeout: Optional[float] = None) -> int: # type: ignore
return super().wait(timeout)
if timeout and self._wait_timeout and timeout < self._wait_timeout:
self._wait_timeout -= timeout
raise subprocess.TimeoutExpired(self.args, timeout)
await self._finalize(timeout)
if self.returncode is None:
raise exceptions.PluginInternalError
return self.returncode

def _get_empty_buffer(self, _: bool) -> asyncio.StreamReader:
return asyncio.StreamReader()
Expand All @@ -362,3 +367,34 @@ async def _reopen_stream(
fresh_stream.feed_data(data)
return fresh_stream
return None

def run_thread(self) -> None:
"""Async impl should not contain any thread based implementation"""

def evaluate(self) -> None:
"""Check if process needs to be finished."""
if self._wait_timeout is None and self._callback is None:
self._finish_process()

async def _run_callback_in_executor(self) -> None:
"""Run in executor the user-defined callback or wait."""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
if self._callback:
kwargs = self._callback_kwargs or {}
cbk = partial(self._callback, **kwargs)
await loop.run_in_executor(pool, cbk, self)
elif self._wait_timeout is not None:
await loop.run_in_executor(pool, self._wait, self._wait_timeout)

async def _finalize(self, timeout: Optional[float] = None) -> None:
"""Run the user-defined callback or wait. Finish process"""
if self.returncode is not None:
return
if timeout is not None:
await asyncio.wait_for(self._run_callback_in_executor(), timeout=timeout)
else:
await self._run_callback_in_executor()
if self.returncode is None:
self.returncode = self._returncode
self._finalize_streams()
2 changes: 1 addition & 1 deletion pytest_subprocess/process_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def _call_async(
result = cls._prepare_instance(AsyncFakePopen, command, kwargs, process)
if not isinstance(result, AsyncFakePopen):
raise exceptions.PluginInternalError
result.run_thread()
result.evaluate()
return result

@classmethod
Expand Down
36 changes: 28 additions & 8 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,7 @@ async def test_popen_recorder(fp):
pytest.param(None, id="no-callback"),
pytest.param(
lambda process: process,
id="noop-callback-causes-infinite-loop",
marks=pytest.mark.xfail(
strict=True, raises=asyncio.TimeoutError, reason="Github #120"
),
id="with-callback",
),
],
)
Expand All @@ -353,15 +350,38 @@ async def my_async_func():
stderr=asyncio.subprocess.PIPE,
)
await process.wait()

# This reads forever when passing a callback to fp.register
# Add a timeout to abort test when condition occurs.
return await asyncio.wait_for(process.stdout.read(), timeout=1)
return await process.stdout.read()

fp.register(["test"], stdout=b"fizz", callback=callback)
assert await my_async_func() == b"fizz"


@pytest.mark.asyncio
async def test_asyncio_subprocess_using_communicate_with_callback_kwargs(fp):
expected_some_value = 2

def cbk(fake_obj, some_value=None):
assert expected_some_value == some_value
return fake_obj

async def my_async_func():
process = await asyncio.create_subprocess_exec(
"test",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, _ = await process.communicate()
return out

fp.register(
["test"],
stdout=b"fizz",
callback=cbk,
callback_kwargs={"some_value": expected_some_value},
)
assert await my_async_func() == b"fizz"


@pytest.fixture(autouse=True)
def skip_on_pypy():
"""Async test for some reason crash on pypy 3.6 on Windows"""
Expand Down

0 comments on commit 570fbaf

Please sign in to comment.