-
Notifications
You must be signed in to change notification settings - Fork 38
Conversation
9eabbfd
to
bcba0f6
Compare
stream_respond(&mut stream_hnd, resp_tx, |_| { | ||
Box::pin(async move { Ok(proto::Response::CloseStream(proto::CloseStreamResp {})) }) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry for the silly formatting, this should read as
stream_respond(&mut stream_hnd, resp_tx, |_| Box::pin(async move {
Ok(proto::Response::CloseStream(proto::CloseStreamResp {}))
})})
Alternative solution: instead of passing jobs through a mpsc channel to a per-stream task, we could introduce a synchronization structure that would work like Tokio struct SerialMutex<T> { ... }
struct SerialMutexWaiter<T> { ... }
struct SerialMutexGuard<T> { ... }
impl SerialMutex<T> {
pub fn acquire(&mut self) -> SerialMutexWaiter<T>;
}
impl SerialMutexWaiter<T> {
pub async fn wait() -> SerialMutexGuard<T>;
}
impl Deref for SerialMutexWaiter<T> {
type Target = T;
}
impl DerefMut for SerialMutexWaiter<T> {
} And each job would spawn its own task, which would wait for the Waiter passed from the parent task: let stream_waiter = stream_mutex.acquire();
join_set.spawn(async move {
let mut stream = stream_waiter.wait().await;
// use stream as `&mut Stream`
}); This solution avoids passing boxed async functions through channels, but it would require a custom synchronization structure. |
// do not propagate this error, because the error that caused the receiver to drop | ||
// is very likely propagating from another task at this moment, and we don't want | ||
// to hide it. | ||
// this is also the reason why we need to use `Fuse` in self.response_rx | ||
tracing::warn!("Response sender was dropped"); | ||
Poll::Pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the assumption that the application will crash and this resource will be freed?
Because otherwise, this future will remain the FuturesUnordered
forever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the assumption. I feel a bit uneasy about this, but I have very bad experience with programs crashing due to "channel closed" errors, without giving a clue about the true source of the error.
The worst thing that can happen is if we drop the sender without raising an error, which would cause the futures to accumulate. The warning is an attempt to mitigate this, because the logs will show that something is wrong.
bors merge |
Build succeeded:
|
The Hrana protocol allows to run statements on different streams concurrently. This PR implements this by spawning a task for each stream. For each request, we pack it into a "job" and send it to the task using a mpsc channel. The response is sent to the code in
hrana::conn
using a oneshot channel.The code becomes more complicated, but I think the solution in this PR is still reasonably clear.
Unfortunately, the compulsory
rustfmt
formatting messed up the code, obscured the structure and made the code harder to read.Stacks on top of #231.