Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainhead backend: notify subscribers when the backend is closed #1817

Merged
merged 10 commits into from
Oct 11, 2024
7 changes: 7 additions & 0 deletions subxt/src/backend/chain_head/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ impl<Hash: BlockHash> Shared<Hash> {
pub fn done(&self) {
let mut shared = self.0.lock().unwrap();
shared.done = true;

// Wake up all subscribers so they get notified that the backend was closed
for details in shared.subscribers.values_mut() {
if let Some(waker) = details.waker.take() {
waker.wake();
}
}
}

/// Cleanup a subscription.
Expand Down
12 changes: 6 additions & 6 deletions subxt/src/backend/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
}

let (backend, mut driver) = self.build(client);

spawn(async move {
// NOTE: we need to poll the driver until it's done i.e returns None
// to ensure that the backend is shutdown properly.
while let Some(res) = driver.next().await {
if let Err(e) = res {
if !e.is_disconnected_will_reconnect() {
tracing::debug!(target: "subxt", "chainHead driver was closed: {e}");
break;
}
if let Err(err) = res {
tracing::debug!(target: "subxt", "chainHead backend error={err}");
}
}

tracing::debug!(target: "subxt", "chainHead backend was closed");
});

backend
Expand Down
4 changes: 2 additions & 2 deletions subxt/src/backend/chain_head/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<T: Config> ChainHeadRpcMethods<T> {
/// The following events are related to operations:
/// - OperationBodyDone: The response of the `chainHead_body`
/// - OperationCallDone: The response of the `chainHead_call`
/// - OperationStorageItems: Items produced by the `chianHead_storage`
/// - OperationStorageItems: Items produced by the `chainHead_storage`
/// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to
/// call `chainHead_continue`
/// - OperationStorageDone: The `chainHead_storage` method has produced all the results
Expand Down Expand Up @@ -651,7 +651,7 @@ impl<Hash: BlockHash> Stream for FollowSubscription<Hash> {

if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res {
// No more events will occur after this one.
self.done = true
self.done = true;
}

res
Expand Down
10 changes: 1 addition & 9 deletions subxt/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,15 +893,7 @@ mod test {
}

fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend<Conf> {
let (backend, mut driver) = build_backend(rpc_client);
tokio::spawn(async move {
while let Some(val) = driver.next().await {
if let Err(e) = val {
eprintln!("Error driving unstable backend: {e}; terminating client");
}
}
});
backend
ChainHeadBackend::builder().build_with_background_driver(rpc_client)
}

fn runtime_spec() -> RuntimeSpec {
Expand Down
Loading