Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
fix: fix re-subscription on websocket reconnect (#2419)
Browse files Browse the repository at this point in the history
Add an in-flight request for each re-subscription request to ensure
that the response from the server with the new internal subscription
id is handled.

Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
  • Loading branch information
gbrew and DaniPopes authored May 23, 2023
1 parent 97f891f commit 159e70b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
24 changes: 14 additions & 10 deletions ethers-providers/src/rpc/transports/ws/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,25 +351,29 @@ impl RequestManager {
old_backend.shutdown();

tracing::debug!(count = self.subs.count(), "Re-starting active subscriptions");
let req_cnt = self.reqs.len();

// reissue subscriptionps
// reissue subscriptions
for (id, sub) in self.subs.to_reissue() {
self.backend
.dispatcher
.unbounded_send(sub.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
let (tx, _rx) = oneshot::channel();
let in_flight = InFlight {
method: "eth_subscribe".to_string(),
params: sub.params.clone(),
channel: tx,
};
// Need an entry in reqs to ensure response with new server sub ID is processed
self.reqs.insert(*id, in_flight);
}

tracing::debug!(count = self.reqs.len(), "Re-issuing pending requests");
// reissue requests. We filter these to prevent in-flight requests for
// subscriptions to be re-issued twice (once in above loop, once in this loop).
for (id, req) in self.reqs.iter().filter(|(id, _)| !self.subs.has(**id)) {
tracing::debug!(count = req_cnt, "Re-issuing pending requests");
// reissue requests, including the re-subscription requests we just added above
for (id, req) in self.reqs.iter() {
self.backend
.dispatcher
.unbounded_send(req.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
}
tracing::info!(subs = self.subs.count(), reqs = self.reqs.len(), "Re-connection complete");
tracing::info!(subs = self.subs.count(), reqs = req_cnt, "Re-connection complete");

Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion ethers-providers/tests/it/ws_errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ethers_core::types::Filter;
use ethers_core::{types::Filter, utils::Anvil};
use ethers_providers::{Middleware, Provider, StreamExt};
use futures_util::SinkExt;
use std::time::Duration;
Expand Down Expand Up @@ -64,3 +64,26 @@ async fn graceful_disconnect_on_ws_errors() {

assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn resubscribe_on_ws_reconnect() {
let anvil = Anvil::new().block_time(1u64).spawn();
let port = anvil.port();
let provider = Provider::connect_with_reconnects(anvil.ws_endpoint(), 1).await.unwrap();

// Attempt to ensure a different server-side subscription id after reconnect by making
// the subscription we care about be the second one after initial startup, but the first
// (and only) one after reconnection.
let ignored_sub = provider.subscribe_blocks().await.unwrap();
let mut blocks = provider.subscribe_blocks().await.unwrap();
ignored_sub.unsubscribe().await.expect("unsubscribe failed");

blocks.next().await.expect("no block notice before reconnect");

// Kill & restart using the same port so we end up with the same endpoint url:
drop(anvil);
let _anvil = Anvil::new().port(port).block_time(1u64).spawn();

// Wait for the next block on existing subscription. Will fail w/o resubscription:
blocks.next().await.expect("no block notice after reconnect");
}

0 comments on commit 159e70b

Please sign in to comment.