Skip to content

Fix send race [debug] #1786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed

Fix send race [debug] #1786

wants to merge 10 commits into from

Conversation

technillogue
Copy link
Contributor

@technillogue technillogue commented Jul 3, 2024

This solves a similar problem as #1758: in some cases, currently emitting a metric or yielding an output can result in an EAGAIN error, and we believe Done events are sometimes dropped. In async land, we shouldn't need a lock; multiple writers should be able to write to a StreamWriter safely. We also keep a sync Lock when the predictor is not async.

However, I'm still not sure if StreamWriter is thread-safe. After some research, StreamWriter.write calls self._transport.write and when StreamWriter is wrapping a unix socket and using the default selector, the _transport is a _SelectorSocketTransport, which calls [self._buffer.extend](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L1077), and self._buffer is a [bytearray](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L761). However, on 3.12, [append is called](https://github.com/python/cpython/blob/3.12/Lib/asyncio/selector_events.py#L1091) instead of extend.

After more soul-searching, I think write should be basically thread-safe: the two critical calls, socket.send() and bytearray.append/extend should be represented as a single bytecode instruction and some native code which works on python objects and does not release the GIL. As far as I understand, python threads are switched "between bytecodes". Even though the individual calls are thread-safe, the overall write method is not quite thread safe: a relevant thread switch could occur between the if not self._buffer, _sock.send, and _buffer.extend lines. However, in that case the race condition would result data could being sent out of order or incorrectly delayed/sent early, but it I think it shouldn't be corrupted, and we don't really care about the ordering of log lines vs outputs.

I can see two solutions:

  1. Ideally, we would probably add an alternate implementation of StreamWriter that uses the main event loop and wraps stderr/stdout in StreamReaders, start a separate task for each stream, and use await wrapped_stream.read() so that threads are not necessary. there's a tricky moment where we need to use the threaded StreamRedirector to capture logs while the predictor is being imported, since we don't know if we're going to be async or not.
  2. Alternatively, we could try to use a queue or deque to communicate between threads, so instead of calling stream_write_hook StreamRedirector would do queue.put, the main event loop would do queue.get and not worry about thread safety for the pipe. The problem with this is that asyncio.Queue is not thread-safe and queue.Queue could block the event loop. You could use a busy wait or similar and get_nowait, but that has other downsides.

@technillogue technillogue requested a review from a team July 3, 2024 23:13
@technillogue technillogue force-pushed the syl/fix-send-race-async branch from b3b7df9 to d42afa5 Compare July 11, 2024 21:35
technillogue added a commit that referenced this pull request Jul 11, 2024
…cellation to succeed

while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.
technillogue added a commit that referenced this pull request Jul 12, 2024
…cellation to succeed

while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.
technillogue added a commit that referenced this pull request Jul 15, 2024
…cellation to succeed

while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.

Signed-off-by: technillogue <technillogue@gmail.com>
@technillogue technillogue force-pushed the syl/fix-send-race-async branch 2 times, most recently from e7a5ff4 to 4982b97 Compare July 17, 2024 18:33
Copy link
Contributor

@mattt mattt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good! I think if we can get tests passing (integration got stuck on the last commit, and I cancelled after 38 minutes) and clean up the debug statements / comments, this should be ready to go.

Comment on lines +260 to +270
if full_line.endswith(self.terminate_token):
full_line = full_line[: -len(self.terminate_token)]
should_exit = True

if full_line.endswith(self.drain_token):
drain_tokens_seen += 1
full_line = full_line[: -len(self.drain_token)]

if full_line:
# Call write_hook from the main thread
self._write_hook(stream.name, stream.original, full_line + "\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth profiling, but precomputed / fixed lengths instead of a string suffix and len could be noticeably faster if this is a hot code path.

Comment on lines 195 to 200
async def setup_async() -> None:
# we prefer to not stop-start the event loop between these calls
await self._async_init() # this creates tasks
debug("async_init done")
await run_setup_async(self._predictor)
debug("ran setup async")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is still needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeating back to confirm my own understanding: We no longer need setup_async, because our new approach is to always run (sync) setup and then transition to async at the end after we know the signature of predict. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, setup may be async -- what I mean here is that

Suggested change
async def setup_async() -> None:
# we prefer to not stop-start the event loop between these calls
await self._async_init() # this creates tasks
debug("async_init done")
await run_setup_async(self._predictor)
debug("ran setup async")
self.loop.run_until_complete(self._async_init())
self.loop.run_until_complete(run_setup_async(self._predictor))

I added the setup_async function there so that the event loop kept going between the two coros because I thought that was causing problems, but I'm not sure that's necessary

@technillogue technillogue force-pushed the syl/fix-send-race-async branch 3 times, most recently from 96f3da1 to 1bff4ee Compare July 19, 2024 00:05
technillogue added a commit that referenced this pull request Jul 22, 2024
…cellation to succeed

while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.

Signed-off-by: technillogue <technillogue@gmail.com>
technillogue added a commit that referenced this pull request Jul 22, 2024
* unconditionally mark predictions as cancelled without waiting for cancellation to succeed

while I struggle with #1786, we see queue spikes because not responding to cancellation promptly causes the pod to get restarted. this is a dirty hack to pretend like cancellation works immediately. as soon as we fix the race condition (and possibly any issues with task.cancel() behaving differently from signal handlers), we can drop this.

Signed-off-by: technillogue <technillogue@gmail.com>

* make Mux.write a sync method

this makes the cancel patch cleaner, but might be a small speedup for high-throughput outputs

Signed-off-by: technillogue <technillogue@gmail.com>

* gate this behind a feature flag

---------

Signed-off-by: technillogue <technillogue@gmail.com>
@technillogue
Copy link
Contributor Author

previously, AsyncConnection would only be used in _loop_async, and _events.send would always be used, which usually immediately makes a write(2) syscall. in contrast, AsyncConnection has a StreamWriter, which should be safe to call from different coroutines.
@technillogue technillogue force-pushed the syl/fix-send-race-async branch from 00c530a to a068801 Compare July 27, 2024 18:23
@mattt
Copy link
Contributor

mattt commented Jul 31, 2024

Closing in favor of #1831

@mattt mattt closed this Jul 31, 2024
@technillogue technillogue changed the title Fix send race Fix send race [debug] Oct 22, 2024
@nickstenning nickstenning deleted the syl/fix-send-race-async branch February 11, 2025 10:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants