Open
Description
It took me some time to debug.
recv() is not cancel-safe because of self.current_request.take()
. I have to spawn a tokio task just for ZMQ, if I want to use tokio::select {} with ReqSocket. There might be other cases where it's not safe to cancel, and lead to inconsistent data.
#[async_trait]
impl SocketRecv for ReqSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
match self.current_request.take() {
Some(peer_id) => {
if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
let message = peer.recv_queue.next().await;
match message {
Some(Ok(Message::Message(mut m))) => {
assert!(m.len() > 1);
assert!(m.pop_front().unwrap().is_empty()); // Ensure that we have delimeter as first part
Ok(m)
}
Some(Ok(_)) => todo!(),
Some(Err(error)) => Err(error.into()),
None => Err(ZmqError::NoMessage),
}
} else {
Err(ZmqError::Other("Server disconnected"))
}
}
None => Err(ZmqError::Other("Unable to recv. No request in progress")),
}
}
}
Metadata
Metadata
Assignees
Labels
No labels