Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P Be robust to connection timeouts during shuffle_receive #8011

Closed
Tracked by #8043
fjetter opened this issue Jul 18, 2023 · 9 comments · Fixed by #8124
Closed
Tracked by #8043

P2P Be robust to connection timeouts during shuffle_receive #8011

fjetter opened this issue Jul 18, 2023 · 9 comments · Fixed by #8124
Assignees
Labels
enhancement Improve existing functionality or make things work better good first issue Clearly described and easy to accomplish. Good for beginners to the project. networking stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@fjetter
Copy link
Member

fjetter commented Jul 18, 2023

When sending shards, we currently rely on the distributed.comm.timeouts.connect to be sufficiently large to be able to establish a connection in case of a cold CommPool.

However, if the remote is struggling with a blocked event loop, this timeout can be too short, particularly as long as #7698 is not fixed.

We could consider retrying the sending on CommErrors. If the remote was dead, the shuffle extension would fail the tasks such that I believe we can be arbitrarily generous with retries during this operation.

return await self.rpc(address).shuffle_receive(
data=to_serialize(shards),
shuffle_id=self.id,
run_id=self.run_id,
)

@fjetter fjetter added enhancement Improve existing functionality or make things work better networking good first issue Clearly described and easy to accomplish. Good for beginners to the project. stability Issue or feature related to cluster stability (e.g. deadlock) labels Jul 18, 2023
@hendrikmakait
Copy link
Member

When adding retries, we should make sure to include checking whether the shuffle run has been closed in the retried code.

@hendrikmakait
Copy link
Member

After merging #7698, P2P shuffle tests have begun to become very flaky. I've tracked this down to a change in the order in which the scheduler is informed about the worker leaving and listeners are being stopped. This now causes the shuffle tasks to fail with a CommClosedError before the scheduler is able to restart the shuffle. As a result, the scheduler will think the tasks genuinely experienced a problem and will not restart them.

As a result, implementing a retry mechanism has become increasingly important. @charlesbluca - let's coordinate on this!

@fjetter
Copy link
Member Author

fjetter commented Jul 31, 2023

I could see us also work around #7698 by warming up our connection pools before the shuffle commences. Something like

class P2PWorkerPlugin:

    async def warmup_comms(self):
        await asyncio.gather(
            self.rpc(w).ping()
            for w in self.workers
        )

Maybe the scheduler initiates this during shuffle init, idk. as long as we have fewer than 512 workers (that's the comm pool), this will likely be a good thing and avoid having any transfer tasks fail due to a connection error. The problem still exists, of course but this way most users will likely not be impacted

@fjetter
Copy link
Member Author

fjetter commented Jul 31, 2023

@hendrikmakait can you elaborate more about the ordering during shutdown that is leading to this? My intuition says that stopping the listener right away as done in #7698 is the correct way and I wonder if we have to change other actions to make this more consistent or if my intuition is wrong

@fjetter
Copy link
Member Author

fjetter commented Jul 31, 2023

When adding retries, we should make sure to include checking whether the shuffle run has been closed in the retried code.

Thought about this briefly again. I'm not entirely sure if this is necessary. I'd expect the remote to either ignore the pushed data or even raise a proper exception if the shuffle was already closed. Is it actually necessary to add more logic on sender side?

Just to be clear, I think we should only retry on OSError

@hendrikmakait
Copy link
Member

Thought about this briefly again. I'm not entirely sure if this is necessary. I'd expect the remote to either ignore the pushed data or even raise a proper exception if the shuffle was already closed. Is it actually necessary to add more logic on sender side?

I've thought of this as an early stopping mechanism for the retries. If the shuffle has been closed (e.g., because a remote worker dropped), there's no point in retrying until we maxed out whatever retry limitations we've set. That worker won't respond anymore.

@hendrikmakait
Copy link
Member

hendrikmakait commented Jul 31, 2023

rt them.

The problem is that by stopping the listeners, RPCs start throwing errors. Remotes using RPC calls start receiving these errors (notably P2P's send buffers and barrier) and raise them. This causes P2P tasks to transition to erred. For all the scheduler knows, those are legitimate errors, because it has not removed the worker yet and therefore we haven't restarted the shuffle, which would have invalidated all those erred tasks coming in from workers. If P2P tasks failed due to some legitimate error (or exceeded suspicious count), we will not restart the P2P shuffle.

For the scheduler to remove the worker, its batched stream to the worker needs to be closed. The worker won't close the stream before it has

  • canceled all asynchronous instructions
  • torn down all preloads
  • closed all extensions
  • torn down all plugins
  • torn down the scheduler RPC
  • stopped all services

so there's now plenty of time for task-erred messages to arrive at the scheduler.

Previously, the worker would have stopped its listeners after it had closed the batched stream to the scheduler. So in almost all scenarios, the scheduler would have removed the worker and caused the shuffle to restart before any task-erred messages would start coming in. The notable exception is some networking-issue where it takes a while for the close-stream message to arrive at the scheduler.

@hendrikmakait
Copy link
Member

TL;DR: I think it's generally a good idea to close listeners early and not let them potentially interfere with shutdown. The problem is just that it adds a race condition we haven't covered in P2P restarts so far.

The most holistic way of solving this is handling the errors that would be caused by this on the receiving end in P2P. The easiest approach should be retrying with some generously-configured limits to ensure that there will never be a deadlock. Checking if we can stop retrying because the shuffle has been closed (e.g., due to a remote worker leaving) seem like a worthwhile optimization should not be strictly necessary. Cancelled tasks would just block workers longer than necessary.

@hendrikmakait
Copy link
Member

After merging #7698, P2P shuffle tests have begun to become very flaky. I've tracked this down to a change in the order in which the scheduler is informed about the worker leaving and listeners are being stopped. This now causes the shuffle tasks to fail with a CommClosedError before the scheduler is able to restart the shuffle. As a result, the scheduler will think the tasks genuinely experienced a problem and will not restart them.

This problem has been solved more holistically (see #8088).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Improve existing functionality or make things work better good first issue Clearly described and easy to accomplish. Good for beginners to the project. networking stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants