-
Notifications
You must be signed in to change notification settings - Fork 617
Fix send race [debug] #1786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix send race [debug] #1786
Conversation
b3b7df9
to
d42afa5
Compare
…cellation to succeed while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.
…cellation to succeed while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.
…cellation to succeed while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this. Signed-off-by: technillogue <technillogue@gmail.com>
e7a5ff4
to
4982b97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good! I think if we can get tests passing (integration got stuck on the last commit, and I cancelled after 38 minutes) and clean up the debug statements / comments, this should be ready to go.
if full_line.endswith(self.terminate_token): | ||
full_line = full_line[: -len(self.terminate_token)] | ||
should_exit = True | ||
|
||
if full_line.endswith(self.drain_token): | ||
drain_tokens_seen += 1 | ||
full_line = full_line[: -len(self.drain_token)] | ||
|
||
if full_line: | ||
# Call write_hook from the main thread | ||
self._write_hook(stream.name, stream.original, full_line + "\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth profiling, but precomputed / fixed lengths instead of a string suffix and len
could be noticeably faster if this is a hot code path.
python/cog/server/worker.py
Outdated
async def setup_async() -> None: | ||
# we prefer to not stop-start the event loop between these calls | ||
await self._async_init() # this creates tasks | ||
debug("async_init done") | ||
await run_setup_async(self._predictor) | ||
debug("ran setup async") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is still needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeating back to confirm my own understanding: We no longer need setup_async
, because our new approach is to always run (sync) setup
and then transition to async at the end after we know the signature of predict
. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, setup may be async -- what I mean here is that
async def setup_async() -> None: | |
# we prefer to not stop-start the event loop between these calls | |
await self._async_init() # this creates tasks | |
debug("async_init done") | |
await run_setup_async(self._predictor) | |
debug("ran setup async") | |
self.loop.run_until_complete(self._async_init()) | |
self.loop.run_until_complete(run_setup_async(self._predictor)) |
I added the setup_async function there so that the event loop kept going between the two coros because I thought that was causing problems, but I'm not sure that's necessary
96f3da1
to
1bff4ee
Compare
…cellation to succeed while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this. Signed-off-by: technillogue <technillogue@gmail.com>
* unconditionally mark predictions as cancelled without waiting for cancellation to succeed while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this. Signed-off-by: technillogue <technillogue@gmail.com> * make Mux.write a sync method this makes the cancel patch cleaner, but might be a small speedup for high-throughput outputs Signed-off-by: technillogue <technillogue@gmail.com> * gate this behind a feature flag --------- Signed-off-by: technillogue <technillogue@gmail.com>
previously, AsyncConnection would only be used in _loop_async, and _events.send would always be used, which usually immediately makes a write(2) syscall. in contrast, AsyncConnection has a StreamWriter, which should be safe to call from different coroutines.
1bff4ee
to
90f3f07
Compare
00c530a
to
a068801
Compare
Closing in favor of #1831 |
This solves a similar problem as #1758: in some cases, currently emitting a metric or yielding an output can result in an EAGAIN error, and we believe Done events are sometimes dropped. In async land, we shouldn't need a lock; multiple writers should be able to write to a StreamWriter safely. We also keep a sync Lock when the predictor is not async.
However, I'm still not sure if StreamWriter is thread-safe.
After some research, StreamWriter.write calls self._transport.write and when StreamWriter is wrapping a unix socket and using the default selector, the _transport is a _SelectorSocketTransport, which calls [self._buffer.extend](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L1077), and self._buffer is a [bytearray](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L761). However, on 3.12, [append is called](https://github.com/python/cpython/blob/3.12/Lib/asyncio/selector_events.py#L1091) instead of extend.After more soul-searching, I think write should be basically thread-safe: the two critical calls, socket.send() and bytearray.append/extend should be represented as a single bytecode instruction and some native code which works on python objects and does not release the GIL. As far as I understand, python threads are switched "between bytecodes". Even though the individual calls are thread-safe, the overall write method is not quite thread safe: a relevant thread switch could occur between the
if not self._buffer
,_sock.send
, and_buffer.extend
lines. However, in that case the race condition would result data could being sent out of order or incorrectly delayed/sent early, but it I think it shouldn't be corrupted, and we don't really care about the ordering of log lines vs outputs.I can see two solutions:
await wrapped_stream.read()
so that threads are not necessary. there's a tricky moment where we need to use the threaded StreamRedirector to capture logs while the predictor is being imported, since we don't know if we're going to be async or not.