Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward 0MQ messages for StdIn over crossbeam channel #58

Merged
merged 5 commits into from
Jun 30, 2023

Conversation

lionel-
Copy link
Contributor

@lionel- lionel- commented Jun 28, 2023

Progress towards posit-dev/positron#535.

We need to communicate an interrupt to the StdIn routine that waits for a reply on its 0MQ socket. Unfortunately there is no way to poll/select over a mix of crossbeam channels and 0MQ sockets. Instead of forwarding the interrupt as a 0MQ message so we could poll multiple sockets, which felt too low level, I though we'd forward 0MQ messages over a channel. This way we will be able to select! over two channels for incoming messages and interrupts.

Because 0MQ sockets can't be shared by multiple threads, and to keep things at the same level of abstraction, outgoing messages are also sent over a channel instead of a socket. StdIn no longer owns its socket and it exclusively communicates with channels.

The conversion between 0MQ and crossbeam is supported by two new threads:

  • The notifier thread watches channels of outgoing messages for readiness. When a channel is readable, a notification is sent to the forwarding thread via a dedicated 0MQ socket.

  • The forwarding thread polls 0MQ sockets for incoming messages and for notifications from the notifier threads. When an incoming message is ready, it's forwarded to the corresponding channel. When an outgoing message is ready, it's forwarded to the corresponding socket.

    This is implemented naively for now as we are only managing a single socket/channel for StdIn, but this can be extended to multiple sockets in the future. Some parts of the code are already implemented with multiple sockets in mind though (when this was easy to do so).

Other supporting features:

  • Socket::new_pair() to create the PAIR sockets used for notification.
  • Socket::socket is now public to allow lower level control, e.g. for polling.

As I'm reflecting on this PR, it does feel like this mechanism is a bit heavy with regards to the payoff of being able to select!() over channels in StdIn. Hopefully it will be useful for other cases as well in the future.

@lionel-
Copy link
Contributor Author

lionel- commented Jun 28, 2023

Now I'm slightly worried about message ordering. Not an issue at the moment since only one socket is managed but extending this mechanism to multiple sockets/channels will require some thought about messaging consistency.

@lionel-
Copy link
Contributor Author

lionel- commented Jun 28, 2023

Actually, if we manage to make the forwarding/notifier thread consistent regarding message ordering, it could help us solve message passing races like the one I just documented in 59559e2.

The race happens between the StdIn and Shell sockets as they might send 0MQ messages out of order depending on how the threads are scheduled. It could be solved by pulling part of the listen() functionality out of the StdIn thread and into a function that would be called on the R thread. This function would block until the input request has been sent to the 0MQ socket, to ensure ordering of the messages.

However since sockets can't be shared across threads, we can't just send the 0MQ message from the R thread. That's where a notifier thread managing all sockets (or in this case both Shell and StdIn) would help since channels can be shared across threads. We could block until we've sent the message by channel and we'd have a guarantee that our message will be delivered before the Shell's message.

This race is not critical at all and very unlikely to happen in practice, but it's a good practical case to think about.

@lionel-
Copy link
Contributor Author

lionel- commented Jun 29, 2023

Actually it was easy to ensure ordering of outgoing messages sent to the forwarding/notifier threads. We just need every outgoing messages to go through a single channel to inherit its FIFO property. The last commit implements that, with a new enum OutgoingMessage with variants like StdIn(Message) to associate an outgoing message to a socket.

With this setup we will be able to send messages through the outbound channel when ordering of outgoing messages is important, for instance to solve the race documented above.

Copy link
Contributor

@jmcphers jmcphers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried something like this early on in Amalthea's design but hadn't hit on the solution you found here that polls over multiple sockets. Nice work!

@lionel- lionel- merged commit 4be01aa into main Jun 30, 2023
@lionel- lionel- deleted the refactor/stdin-channels branch June 30, 2023 10:24
@lionel- lionel- mentioned this pull request Oct 10, 2024
lionel- added a commit that referenced this pull request Oct 10, 2024
Closes #569

This PR fixes a race condition regarding subscriptions to IOPub that causes clients to miss IOPub messages:

- On startup a client connects to the server sockets of a kernel.
- The client sends a request on Shell.
- The kernel starts processing the request and emits busy on IOPub.

If the client hasn't been able to fully subscribe to IOPub, messages can be lost, in particular the Busy message that encloses the request output.

On the Positron side we fixed it by sending kernel-info requests in a loop until we get a Ready message on IOPub. This signals Positron that the kernel is fully connected and in the Ready state: posit-dev/positron#2207. We haven't implemented a similar fix in our dummy clients for integration tests and we believe this is what is causing the race condition described in #569.

As noted in posit-dev/positron#2207, there is an accepted JEP proposal (JEP 65) that aims at solving this problem by switching to XPUB.

https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html
jupyter/enhancement-proposals#65

The XPUB socket allows the server to get notified of all new subscriptions. A message of type `iopub_welcome` is sent to all connected clients. They should generally ignore it but clients that have just started up can use it as a cue that IOPub is correctly connected and that they won't miss any output from that point on.

Approach:

The subscription notification comes in as a message on the IOPub socket. This is problematic because the IOPub thread now needs to listens to its crossbeam channel and to the 0MQ socket at the same time, which isn't possible without resorting to timeout polling. So we use the same approach and infrastructure that we implemented in #58 for listeing to both input replies on the StdIn socket and interrupt notifications on a crossbeam channel. The forwarding thread now owns the IOPub socket and listens for subscription notifications and fowrards IOPub messages coming from the kernel components.

---

* Start moving IOPub messages to forwarding thread

* Remove unused import

* Resolve the roundabout `Message` problem

The solution was to move the conversion to `JupyterMessage<T>` up into the match, so we "know" what `T` is!

* Use correct `Welcome` `MessageType`

* Implement `SubscriptionMessage` support and switch to `XPUB`

* The `Welcome` message doesn't come from ark

* Use `amalthea::Result`

* Add more comments

---------


Co-authored-by: Davis Vaughan <davis@posit.co>
Co-authored-by: Lionel Henry <lionel@posit.co>
lionel- added a commit that referenced this pull request Oct 10, 2024
Closes #569

This PR fixes a race condition regarding subscriptions to IOPub that causes clients to miss IOPub messages:

- On startup a client connects to the server sockets of a kernel.
- The client sends a request on Shell.
- The kernel starts processing the request and emits busy on IOPub.

If the client hasn't been able to fully subscribe to IOPub, messages can be lost, in particular the Busy message that encloses the request output.

On the Positron side we fixed it by sending kernel-info requests in a loop until we get a Ready message on IOPub. This signals Positron that the kernel is fully connected and in the Ready state: posit-dev/positron#2207. We haven't implemented a similar fix in our dummy clients for integration tests and we believe this is what is causing the race condition described in #569.

As noted in posit-dev/positron#2207, there is an accepted JEP proposal (JEP 65) that aims at solving this problem by switching to XPUB.

https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html
jupyter/enhancement-proposals#65

The XPUB socket allows the server to get notified of all new subscriptions. A message of type `iopub_welcome` is sent to all connected clients. They should generally ignore it but clients that have just started up can use it as a cue that IOPub is correctly connected and that they won't miss any output from that point on.

Approach:

The subscription notification comes in as a message on the IOPub socket. This is problematic because the IOPub thread now needs to listens to its crossbeam channel and to the 0MQ socket at the same time, which isn't possible without resorting to timeout polling. So we use the same approach and infrastructure that we implemented in #58 for listeing to both input replies on the StdIn socket and interrupt notifications on a crossbeam channel. The forwarding thread now owns the IOPub socket and listens for subscription notifications and fowrards IOPub messages coming from the kernel components.

---

* Start moving IOPub messages to forwarding thread

* Remove unused import

* Resolve the roundabout `Message` problem

The solution was to move the conversion to `JupyterMessage<T>` up into the match, so we "know" what `T` is!

* Use correct `Welcome` `MessageType`

* Implement `SubscriptionMessage` support and switch to `XPUB`

* The `Welcome` message doesn't come from ark

* Use `amalthea::Result`

* Add more comments

---------

Co-authored-by: Davis Vaughan <davis@posit.co>
Co-authored-by: Lionel Henry <lionel@posit.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants