Skip to content

Mux prediction events #1405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6291c73
race utility for racing awaitables
technillogue Dec 7, 2023
6da64f0
start mux
technillogue Nov 29, 2023
ff06d76
tag events with id, read pipe in a task, get events from mux
technillogue Nov 29, 2023
bb4e3d9
use async pipe for async child loop
technillogue Nov 29, 2023
5cbecae
_shutting_down vs _terminating
technillogue Dec 2, 2023
4893d77
race with shutdown event
technillogue Dec 7, 2023
5f4ef23
keep reading events during shutdown, but call terminate after the las…
technillogue Dec 4, 2023
017dc09
emit heartbeats from mux.read
technillogue Dec 1, 2023
89b972b
don't use _wait. instead, setup reads event from the mux too
technillogue Dec 1, 2023
1b35429
worker semaphore and prediction ctx
technillogue Jan 23, 2024
1bf2c9b
where _wait used to raise a fatal error, have _read_events set an err…
technillogue Jan 23, 2024
87ac9e9
fix event loop errors for <3.9
technillogue Dec 5, 2023
f8ccfd8
keep track of predictions in flight explicitly and use that to route …
technillogue Dec 5, 2023
b2a5fef
don't wait for executor shutdown
technillogue Dec 22, 2023
3bd794f
progress: check for cancelation in task done_handler
technillogue Jan 18, 2024
b0a526b
let mux check if child is alive and set mux shutdown after leaving re…
technillogue Jan 19, 2024
925fe5e
close pipe when exiting
technillogue Jan 19, 2024
c2a075d
predict requires IDLE or PROCESSING
technillogue Jan 23, 2024
24bf187
idk, try adding a BUSY state distinct from PROCESSING when we no long…
technillogue Jan 23, 2024
aad18e2
move resetting events to setup() instead of _read_events()
technillogue Jan 25, 2024
ec19c1e
state_from_predictions_in_flight instead of checking the value of sem…
technillogue Feb 2, 2024
6f52b94
make prediction_ctx "private"
technillogue Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/cog/server/eventtypes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import secrets
from typing import Any, Dict

from attrs import define, field, validators
Expand All @@ -8,6 +9,7 @@
@define
class PredictionInput:
payload: Dict[str, Any]
id: str = field(factory=lambda: secrets.token_hex(4))


@define
Expand Down
48 changes: 45 additions & 3 deletions python/cog/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import io
import os
import selectors
import sys
import threading
import uuid
from multiprocessing.connection import Connection
from typing import (
Any,
Callable,
Coroutine,
Generic,
Optional,
Sequence,
TextIO,
TypeVar,
Union,
)


Expand Down Expand Up @@ -160,13 +163,44 @@ def run(self) -> None:
self.drain_event.set()
drain_tokens_seen = 0


X = TypeVar("X")
Y = TypeVar("Y")


async def race(
x: Coroutine[None, None, X],
y: Coroutine[None, None, Y],
timeout: Optional[float] = None,
) -> Union[X, Y]:
tasks = [asyncio.create_task(x), asyncio.create_task(y)]
wait = asyncio.wait(tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
done, pending = await wait
for task in pending:
task.cancel()
if not done:
raise TimeoutError
# done is an unordered set but we want to preserve original order
result_task, *others = (t for t in tasks if t in done)
# during shutdown, some of the other completed tasks might be an error
# cancel them instead of handling the error to avoid the warning
# "Task exception was never retrieved"
for task in others:
msg = "was completed at the same time as another selected task, canceling"
# FIXME: ues a logger?
print(task, msg, file=sys.stderr)
task.cancel()
return result_task.result()


# functionally this is the exact same thing as aioprocessing but 0.1% the code
# however it's still worse than just using actual asynchronous io
class AsyncPipe(Generic[X]):
def __init__(self, conn: Connection) -> None:
def __init__(
self, conn: Connection, alive: Callable[[], bool] = lambda: True
) -> None:
self.conn = conn
self.alive = alive
self.exiting = threading.Event()
self.executor = concurrent.futures.ThreadPoolExecutor(1)

Expand All @@ -175,7 +209,7 @@ def send(self, obj: Any) -> None:

def shutdown(self) -> None:
self.exiting.set()
self.executor.shutdown(wait=False)
self.executor.shutdown(wait=True)
# if we ever need cancel_futures (introduced 3.9), we can copy it in from
# https://github.com/python/cpython/blob/3.11/Lib/concurrent/futures/thread.py#L216-L235

Expand All @@ -185,12 +219,20 @@ def poll(self, timeout: float = 0.0) -> bool:
def _recv(self) -> Optional[X]:
# this ugly mess could easily be avoided with loop.connect_read_pipe
# even loop.add_reader would help but we don't want to mess with a thread-local loop
while not self.exiting.is_set():
while not self.exiting.is_set() and not self.conn.closed and self.alive():
if self.conn.poll(0.01):
if self.conn.closed or not self.alive():
print("caught conn closed or unalive")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I know what situation this is catching, but do we actually want to print this to STDOUT when it happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a clear idea of why this is necessary, but if I change it like this

diff --git a/python/cog/server/helpers.py b/python/cog/server/helpers.py
index 220537a..ad8208d 100644
--- a/python/cog/server/helpers.py
+++ b/python/cog/server/helpers.py
@@ -219,7 +219,7 @@ class AsyncPipe(Generic[X]):
     def _recv(self) -> Optional[X]:
         # this ugly mess could easily be avoided with loop.connect_read_pipe
         # even loop.add_reader would help but we don't want to mess with a thread-local loop
-        while not self.exiting.is_set() and not self.conn.closed and self.alive():
+        while not self.exiting.is_set() and not self.conn.closed:  # and self.alive():
             if self.conn.poll(0.01):
                 if self.conn.closed or not self.alive():
                     print("caught conn closed or unalive")
diff --git a/python/cog/server/worker.py b/python/cog/server/worker.py
index 2e5f6a7..c100e81 100644
--- a/python/cog/server/worker.py
+++ b/python/cog/server/worker.py
@@ -260,6 +260,8 @@ class Worker:
             await self._mux.write(id, event)
         # If we dropped off the end off the end of the loop, check if it's
         # because the child process died.
+        if not self._child.is_alive():
+            self._events.shutdown()
         if not self._child.is_alive() and not self._terminating.is_set():
             exitcode = self._child.exitcode
             self._mux.fatal = FatalWorkerException(

then test_fatalworkerexception_from_irrecoverable_failures will deadlock. I don't fully understand why this is the case. I think it might be because _read_events blocks on coro_recv_with_exit, so it doesn't get to check is_alive. you could change it by adding a timeout and doing more poll-like behavior, but we already have a loop doing that inside AsyncPipe so I feel like we may as well keep there. willing to change this though

return
return self.conn.recv()
return None

async def coro_recv(self) -> Optional[X]:
loop = asyncio.get_running_loop()
# believe it or not this can still deadlock!
return await loop.run_in_executor(self.executor, self._recv)

async def coro_recv_with_exit(self, exit: asyncio.Event) -> Optional[X]:
result = await race(self.coro_recv(), exit.wait())
if result is not True: # wait() would return True
return result
Loading