Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed RPC subscriptions leak when subscription stream is finished #4533

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions prdoc/pr_4533.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "Fixed RPC subscriptions leak when subscription stream is finished"

doc:
- audience: Node Operator
description: |
The node may leak RPC subscriptions in some cases, e.g. during
`author_submitAndWatchExtrinsic` calls. This PR fixes the issue.

crates:
- name: sc-rpc
26 changes: 25 additions & 1 deletion substrate/client/rpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn inner_pipe_from_stream<S, T>(
//
// Process remaining items and terminate.
Either::Right((Either::Right((None, pending_fut)), _)) => {
if pending_fut.await.is_err() {
if !pending_fut.is_terminated() && pending_fut.await.is_err() {
return;
}

Expand Down Expand Up @@ -231,4 +231,28 @@ mod tests {
_ = rx.next().await.unwrap();
assert!(sub.next::<usize>().await.is_none());
}

#[tokio::test]
async fn subscription_is_dropped_when_stream_is_empty() {
let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
let notify_tx = notify_rx.clone();

let mut module = RpcModule::new(notify_tx);
module
.register_subscription("sub", "my_sub", "unsub", |_, pending, notify_tx| async move {
// emulate empty stream for simplicity: otherwise we need some mechanism
// to sync buffer and channel send operations
let stream = futures::stream::empty::<()>();
// this should exit immediately
pipe_from_stream(pending, stream).await;
// notify that the `pipe_from_stream` has returned
notify_tx.notify_one();
Ok(())
})
.unwrap();
module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();

// it should fire once `pipe_from_stream` returns
notify_rx.notified().await;
}
}
Loading