-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
P2P Be robust to connection timeouts during shuffle_receive #8011
Comments
When adding retries, we should make sure to include checking whether the shuffle run has been closed in the retried code. |
After merging #7698, P2P shuffle tests have begun to become very flaky. I've tracked this down to a change in the order in which the scheduler is informed about the worker leaving and listeners are being stopped. This now causes the shuffle tasks to fail with a As a result, implementing a retry mechanism has become increasingly important. @charlesbluca - let's coordinate on this! |
I could see us also work around #7698 by warming up our connection pools before the shuffle commences. Something like class P2PWorkerPlugin:
async def warmup_comms(self):
await asyncio.gather(
self.rpc(w).ping()
for w in self.workers
) Maybe the scheduler initiates this during shuffle init, idk. as long as we have fewer than 512 workers (that's the comm pool), this will likely be a good thing and avoid having any transfer tasks fail due to a connection error. The problem still exists, of course but this way most users will likely not be impacted |
@hendrikmakait can you elaborate more about the ordering during shutdown that is leading to this? My intuition says that stopping the listener right away as done in #7698 is the correct way and I wonder if we have to change other actions to make this more consistent or if my intuition is wrong |
Thought about this briefly again. I'm not entirely sure if this is necessary. I'd expect the remote to either ignore the pushed data or even raise a proper exception if the shuffle was already closed. Is it actually necessary to add more logic on sender side? Just to be clear, I think we should only retry on |
I've thought of this as an early stopping mechanism for the retries. If the shuffle has been closed (e.g., because a remote worker dropped), there's no point in retrying until we maxed out whatever retry limitations we've set. That worker won't respond anymore. |
The problem is that by stopping the listeners, RPCs start throwing errors. Remotes using RPC calls start receiving these errors (notably P2P's send buffers and barrier) and raise them. This causes P2P tasks to transition to For the scheduler to remove the worker, its batched stream to the worker needs to be closed. The worker won't close the stream before it has
so there's now plenty of time for Previously, the worker would have stopped its listeners after it had closed the batched stream to the scheduler. So in almost all scenarios, the scheduler would have removed the worker and caused the shuffle to restart before any |
TL;DR: I think it's generally a good idea to close listeners early and not let them potentially interfere with shutdown. The problem is just that it adds a race condition we haven't covered in P2P restarts so far. The most holistic way of solving this is handling the errors that would be caused by this on the receiving end in P2P. The easiest approach should be retrying with some generously-configured limits to ensure that there will never be a deadlock. Checking if we can stop retrying because the shuffle has been closed (e.g., due to a remote worker leaving) seem like a worthwhile optimization should not be strictly necessary. Cancelled tasks would just block workers longer than necessary. |
This problem has been solved more holistically (see #8088). |
When sending shards, we currently rely on the
distributed.comm.timeouts.connect
to be sufficiently large to be able to establish a connection in case of a cold CommPool.However, if the remote is struggling with a blocked event loop, this timeout can be too short, particularly as long as #7698 is not fixed.
We could consider retrying the sending on CommErrors. If the remote was dead, the shuffle extension would fail the tasks such that I believe we can be arbitrarily generous with retries during this operation.
distributed/distributed/shuffle/_worker_plugin.py
Lines 128 to 132 in 2be7f35
The text was updated successfully, but these errors were encountered: