-
Notifications
You must be signed in to change notification settings - Fork 617
Mux prediction events #1405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mux prediction events #1405
Changes from all commits
6291c73
6da64f0
ff06d76
bb4e3d9
5cbecae
4893d77
5f4ef23
017dc09
89b972b
1b35429
1bf2c9b
87ac9e9
f8ccfd8
b2a5fef
3bd794f
b0a526b
925fe5e
c2a075d
24bf187
aad18e2
ec19c1e
6f52b94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,17 +3,20 @@ | |
import io | ||
import os | ||
import selectors | ||
import sys | ||
import threading | ||
import uuid | ||
from multiprocessing.connection import Connection | ||
from typing import ( | ||
Any, | ||
Callable, | ||
Coroutine, | ||
Generic, | ||
Optional, | ||
Sequence, | ||
TextIO, | ||
TypeVar, | ||
Union, | ||
) | ||
|
||
|
||
|
@@ -160,13 +163,44 @@ def run(self) -> None: | |
self.drain_event.set() | ||
drain_tokens_seen = 0 | ||
|
||
|
||
X = TypeVar("X") | ||
Y = TypeVar("Y") | ||
|
||
|
||
async def race( | ||
x: Coroutine[None, None, X], | ||
y: Coroutine[None, None, Y], | ||
timeout: Optional[float] = None, | ||
) -> Union[X, Y]: | ||
tasks = [asyncio.create_task(x), asyncio.create_task(y)] | ||
wait = asyncio.wait(tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED) | ||
done, pending = await wait | ||
for task in pending: | ||
task.cancel() | ||
if not done: | ||
raise TimeoutError | ||
# done is an unordered set but we want to preserve original order | ||
result_task, *others = (t for t in tasks if t in done) | ||
# during shutdown, some of the other completed tasks might be an error | ||
# cancel them instead of handling the error to avoid the warning | ||
# "Task exception was never retrieved" | ||
for task in others: | ||
msg = "was completed at the same time as another selected task, canceling" | ||
# FIXME: ues a logger? | ||
print(task, msg, file=sys.stderr) | ||
task.cancel() | ||
return result_task.result() | ||
technillogue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
# functionally this is the exact same thing as aioprocessing but 0.1% the code | ||
# however it's still worse than just using actual asynchronous io | ||
class AsyncPipe(Generic[X]): | ||
def __init__(self, conn: Connection) -> None: | ||
def __init__( | ||
self, conn: Connection, alive: Callable[[], bool] = lambda: True | ||
) -> None: | ||
self.conn = conn | ||
self.alive = alive | ||
self.exiting = threading.Event() | ||
self.executor = concurrent.futures.ThreadPoolExecutor(1) | ||
|
||
|
@@ -175,7 +209,7 @@ def send(self, obj: Any) -> None: | |
|
||
def shutdown(self) -> None: | ||
self.exiting.set() | ||
self.executor.shutdown(wait=False) | ||
self.executor.shutdown(wait=True) | ||
# if we ever need cancel_futures (introduced 3.9), we can copy it in from | ||
# https://github.com/python/cpython/blob/3.11/Lib/concurrent/futures/thread.py#L216-L235 | ||
|
||
|
@@ -185,12 +219,20 @@ def poll(self, timeout: float = 0.0) -> bool: | |
def _recv(self) -> Optional[X]: | ||
# this ugly mess could easily be avoided with loop.connect_read_pipe | ||
# even loop.add_reader would help but we don't want to mess with a thread-local loop | ||
while not self.exiting.is_set(): | ||
while not self.exiting.is_set() and not self.conn.closed and self.alive(): | ||
if self.conn.poll(0.01): | ||
if self.conn.closed or not self.alive(): | ||
print("caught conn closed or unalive") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I know what situation this is catching, but do we actually want to print this to STDOUT when it happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a clear idea of why this is necessary, but if I change it like this diff --git a/python/cog/server/helpers.py b/python/cog/server/helpers.py
index 220537a..ad8208d 100644
--- a/python/cog/server/helpers.py
+++ b/python/cog/server/helpers.py
@@ -219,7 +219,7 @@ class AsyncPipe(Generic[X]):
def _recv(self) -> Optional[X]:
# this ugly mess could easily be avoided with loop.connect_read_pipe
# even loop.add_reader would help but we don't want to mess with a thread-local loop
- while not self.exiting.is_set() and not self.conn.closed and self.alive():
+ while not self.exiting.is_set() and not self.conn.closed: # and self.alive():
if self.conn.poll(0.01):
if self.conn.closed or not self.alive():
print("caught conn closed or unalive")
diff --git a/python/cog/server/worker.py b/python/cog/server/worker.py
index 2e5f6a7..c100e81 100644
--- a/python/cog/server/worker.py
+++ b/python/cog/server/worker.py
@@ -260,6 +260,8 @@ class Worker:
await self._mux.write(id, event)
# If we dropped off the end off the end of the loop, check if it's
# because the child process died.
+ if not self._child.is_alive():
+ self._events.shutdown()
if not self._child.is_alive() and not self._terminating.is_set():
exitcode = self._child.exitcode
self._mux.fatal = FatalWorkerException( then test_fatalworkerexception_from_irrecoverable_failures will deadlock. I don't fully understand why this is the case. I think it might be because _read_events blocks on coro_recv_with_exit, so it doesn't get to check is_alive. you could change it by adding a timeout and doing more poll-like behavior, but we already have a loop doing that inside AsyncPipe so I feel like we may as well keep there. willing to change this though |
||
return | ||
return self.conn.recv() | ||
return None | ||
|
||
async def coro_recv(self) -> Optional[X]: | ||
loop = asyncio.get_running_loop() | ||
# believe it or not this can still deadlock! | ||
return await loop.run_in_executor(self.executor, self._recv) | ||
|
||
async def coro_recv_with_exit(self, exit: asyncio.Event) -> Optional[X]: | ||
result = await race(self.coro_recv(), exit.wait()) | ||
if result is not True: # wait() would return True | ||
return result |
Uh oh!
There was an error while loading. Please reload this page.