Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Hrana concurrency #240

Merged
merged 4 commits into from
Feb 22, 2023
Merged

Hrana concurrency #240

merged 4 commits into from
Feb 22, 2023

Conversation

honzasp
Copy link
Contributor

@honzasp honzasp commented Feb 19, 2023

The Hrana protocol allows to run statements on different streams concurrently. This PR implements this by spawning a task for each stream. For each request, we pack it into a "job" and send it to the task using a mpsc channel. The response is sent to the code in hrana::conn using a oneshot channel.

The code becomes more complicated, but I think the solution in this PR is still reasonably clear.

Unfortunately, the compulsory rustfmt formatting messed up the code, obscured the structure and made the code harder to read.

Stacks on top of #231.

@honzasp honzasp requested a review from MarinPostma February 19, 2023 20:31
@honzasp honzasp changed the base branch from hrana-server to main February 21, 2023 09:57
@honzasp honzasp marked this pull request as ready for review February 21, 2023 14:26
Comment on lines +122 to +124
stream_respond(&mut stream_hnd, resp_tx, |_| {
Box::pin(async move { Ok(proto::Response::CloseStream(proto::CloseStreamResp {})) })
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for the silly formatting, this should read as

stream_respond(&mut stream_hnd, resp_tx, |_| Box::pin(async move {
    Ok(proto::Response::CloseStream(proto::CloseStreamResp {}))
})})

@honzasp
Copy link
Contributor Author

honzasp commented Feb 22, 2023

Alternative solution: instead of passing jobs through a mpsc channel to a per-stream task, we could introduce a synchronization structure that would work like Tokio Mutex but would guarantee correct ordering; something like

struct SerialMutex<T> { ... }
struct SerialMutexWaiter<T> { ... }
struct SerialMutexGuard<T> { ... }

impl SerialMutex<T> {
    pub fn acquire(&mut self) -> SerialMutexWaiter<T>;
}

impl SerialMutexWaiter<T> {
    pub async fn wait() -> SerialMutexGuard<T>;
}

impl Deref for SerialMutexWaiter<T> {
    type Target = T;
}
impl DerefMut for SerialMutexWaiter<T> {
}

And each job would spawn its own task, which would wait for the Waiter passed from the parent task:

let stream_waiter = stream_mutex.acquire();
join_set.spawn(async move {
    let mut stream = stream_waiter.wait().await;
    // use stream as `&mut Stream`
});

This solution avoids passing boxed async functions through channels, but it would require a custom synchronization structure.

Comment on lines +239 to +244
// do not propagate this error, because the error that caused the receiver to drop
// is very likely propagating from another task at this moment, and we don't want
// to hide it.
// this is also the reason why we need to use `Fuse` in self.response_rx
tracing::warn!("Response sender was dropped");
Poll::Pending
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the assumption that the application will crash and this resource will be freed?
Because otherwise, this future will remain the FuturesUnordered forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the assumption. I feel a bit uneasy about this, but I have very bad experience with programs crashing due to "channel closed" errors, without giving a clue about the true source of the error.

The worst thing that can happen is if we drop the sender without raising an error, which would cause the futures to accumulate. The warning is an attempt to mitigate this, because the logs will show that something is wrong.

@honzasp
Copy link
Contributor Author

honzasp commented Feb 22, 2023

bors merge
After an informal approval from @MarinPostma

@bors
Copy link
Contributor

bors bot commented Feb 22, 2023

Build succeeded:

@bors bors bot merged commit af8d7d7 into libsql:main Feb 22, 2023
@honzasp honzasp deleted the hrana-concur branch February 27, 2023 10:10
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants