Skip to content

feat: Create endpoint manager for websockets #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: plat-6492-add-network-and-models
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dotenvy = "0.15.7"
email_address = "0.2.9"
ethabi = "18.0.0"
futures = "0.3"
futures-util = "0.3.31"
glob = "0.3"
hex = "0.4"
hmac = "0.12.0"
Expand Down Expand Up @@ -80,7 +81,6 @@ zeroize = { version = "1.8.1", features = ["derive"] }

[dev-dependencies]
cargo-llvm-cov = "0.6"
futures-util = "0.3.31"
mockall = "0.13.1"
mockito = "1.6.1"
once_cell = "1.20.0"
Expand Down
30 changes: 16 additions & 14 deletions src/services/blockchain/clients/midnight/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::{
services::{
blockchain::{
client::BlockChainClient,
transports::{BlockchainTransport, MidnightTransportClient},
BlockFilterFactory, WsTransportClient,
transports::{BlockchainTransport, MidnightHttpTransportClient},
BlockFilterFactory, MidnightWsTransportClient,
},
filter::MidnightBlockFilter,
},
Expand All @@ -44,7 +44,7 @@ impl<H: Send + Sync + Clone, W: Send + Sync + Clone> MidnightClient<H, W> {
}
}

impl MidnightClient<MidnightTransportClient, WsTransportClient> {
impl MidnightClient<MidnightHttpTransportClient, MidnightWsTransportClient> {
/// Creates a new Midnight client instance
///
/// # Arguments
Expand All @@ -53,17 +53,19 @@ impl MidnightClient<MidnightTransportClient, WsTransportClient> {
/// # Returns
/// * `Result<Self, anyhow::Error>` - New client instance or connection error
pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
let http_client = MidnightTransportClient::new(network).await?;
let ws_client = WsTransportClient::new(network).await.map_or_else(
|e| {
// We fail to create a WebSocket client if there are no working URLs
// This limits the functionality of the service, by not allowing monitoring of transaction status or event-related data
// but it is not a critical issue
tracing::warn!("Failed to create WebSocket client: {}", e);
None
},
Some,
);
let http_client = MidnightHttpTransportClient::new(network).await?;
let ws_client = MidnightWsTransportClient::new(network, None)
.await
.map_or_else(
|e| {
// We fail to create a WebSocket client if there are no working URLs
// This limits the functionality of the service, by not allowing monitoring of transaction status or event-related data
// but it is not a critical issue
tracing::warn!("Failed to create WebSocket client: {}", e);
None
},
Some,
);
Ok(Self::new_with_transport(http_client, ws_client))
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/services/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub use clients::{
pub use error::BlockChainError;
pub use pool::{ClientPool, ClientPoolTrait};
pub use transports::{
BlockchainTransport, EVMTransportClient, EndpointManager, HttpTransportClient,
MidnightTransportClient, RotatingTransport, StellarTransportClient,
TransientErrorRetryStrategy, WsTransportClient,
BlockchainTransport, EVMTransportClient, HttpEndpointManager, HttpTransportClient,
MidnightHttpTransportClient, MidnightWsTransportClient, RotatingTransport,
StellarTransportClient, TransientErrorRetryStrategy, WsConfig, WsEndpointManager,
WsTransportClient,
};
8 changes: 4 additions & 4 deletions src/services/blockchain/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::{
models::{BlockChainType, Network},
services::blockchain::{
BlockChainClient, BlockFilterFactory, EVMTransportClient, EvmClient, EvmClientTrait,
MidnightClient, MidnightClientTrait, MidnightTransportClient, StellarClient,
StellarClientTrait, StellarTransportClient, WsTransportClient,
MidnightClient, MidnightClientTrait, MidnightHttpTransportClient,
MidnightWsTransportClient, StellarClient, StellarClientTrait, StellarTransportClient,
},
};
use anyhow::Context;
Expand Down Expand Up @@ -87,7 +87,7 @@ impl ClientPool {
// Register client types
pool.register_client_type::<EvmClient<EVMTransportClient>>(BlockChainType::EVM);
pool.register_client_type::<StellarClient<StellarTransportClient>>(BlockChainType::Stellar);
pool.register_client_type::<MidnightClient<MidnightTransportClient, WsTransportClient>>(
pool.register_client_type::<MidnightClient<MidnightHttpTransportClient, MidnightWsTransportClient>>(
BlockChainType::Midnight,
);

Expand Down Expand Up @@ -148,7 +148,7 @@ impl ClientPool {
impl ClientPoolTrait for ClientPool {
type EvmClient = EvmClient<EVMTransportClient>;
type StellarClient = StellarClient<StellarTransportClient>;
type MidnightClient = MidnightClient<MidnightTransportClient, WsTransportClient>;
type MidnightClient = MidnightClient<MidnightHttpTransportClient, MidnightWsTransportClient>;

/// Gets or creates an EVM client for the given network.
///
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Manages the rotation of blockchain RPC endpoints
//! Manages the rotation of blockchain HTTP RPC endpoints
//!
//! Provides methods for rotating between multiple URLs and sending requests to the active endpoint
//! with automatic fallback to other URLs on failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use url::Url;
use crate::{
models::Network,
services::blockchain::transports::{
BlockchainTransport, EndpointManager, RotatingTransport, TransientErrorRetryStrategy,
http::endpoint_manager::EndpointManager, BlockchainTransport, RotatingTransport,
TransientErrorRetryStrategy,
},
};

Expand Down Expand Up @@ -100,6 +101,7 @@ impl HttpTransportClient {
Err(_) => continue,
};

// TODO: At some point we should re-use `try_connect` to test connectivity
let test_request = if let Some(test_payload) = &test_connection_payload {
serde_json::from_str(test_payload)
.context("Failed to parse test payload as JSON")?
Expand Down Expand Up @@ -189,12 +191,9 @@ impl BlockchainTransport for HttpTransportClient {
where
P: Into<Value> + Send + Clone + Serialize,
{
let response = self
.endpoint_manager
self.endpoint_manager
.send_raw_request(self, method, params)
.await?;

Ok(response)
.await
}

/// Updates the retry policy configuration
Expand Down
126 changes: 126 additions & 0 deletions src/services/blockchain/transports/midnight/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! Midnight transport implementation for blockchain interactions.
//!
//! This module provides a client implementation for interacting with Midnight-compatible nodes
//! by wrapping the WsTransportClient. This allows for consistent behavior with other
//! transport implementations while providing specific Midnight-focused functionality.

use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::policies::ExponentialBackoff;
use serde::Serialize;
use serde_json::Value;

use crate::{
models::Network,
services::blockchain::{
transports::{
BlockchainTransport, RotatingTransport, TransientErrorRetryStrategy, WsTransportClient,
},
WsConfig,
},
};

/// A client for interacting with Midnight-compatible blockchain nodes via WebSocket
///
/// This implementation wraps the WsTransportClient to provide consistent
/// behavior with other transport implementations while offering Midnight-specific
/// functionality. It handles WebSocket connection management, message handling,
/// and endpoint rotation for Midnight-based networks.
#[derive(Clone, Debug)]
pub struct MidnightTransportClient {
/// The underlying WebSocket transport client that handles actual RPC communications
ws_client: WsTransportClient,
}

impl MidnightTransportClient {
/// Creates a new Midnight transport client by initializing a WebSocket transport client
///
/// # Arguments
/// * `network` - Network configuration containing RPC URLs and other network details
///
/// # Returns
/// * `Result<Self, anyhow::Error>` - A new client instance or connection error
pub async fn new(network: &Network, config: Option<WsConfig>) -> Result<Self, anyhow::Error> {
let ws_client = WsTransportClient::new(network, config).await?;
Ok(Self { ws_client })
}
}

#[async_trait::async_trait]
impl BlockchainTransport for MidnightTransportClient {
/// Gets the current active RPC URL
///
/// # Returns
/// * `String` - The currently active RPC endpoint URL
async fn get_current_url(&self) -> String {
self.ws_client.get_current_url().await
}

/// Sends a raw JSON-RPC request to the Midnight node via WebSocket
///
/// # Arguments
/// * `method` - The JSON-RPC method to call
/// * `params` - Optional parameters to pass with the request
///
/// # Returns
/// * `Result<Value, anyhow::Error>` - The JSON response or error
async fn send_raw_request<P>(
&self,
method: &str,
params: Option<P>,
) -> Result<Value, anyhow::Error>
where
P: Into<Value> + Send + Clone + Serialize,
{
self.ws_client.send_raw_request(method, params).await
}

/// Sets a new retry policy for the transport
///
/// Note: Not applicable for WebSocket transport
fn set_retry_policy(
&mut self,
_retry_policy: ExponentialBackoff,
_retry_strategy: Option<TransientErrorRetryStrategy>,
) -> Result<(), anyhow::Error> {
Err(anyhow::anyhow!(
Copy link
Contributor

@NicoMolinaOZ NicoMolinaOZ May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this to maintain consistency with other transport implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it implements BlockchainTransport where we only need get_current_url and send_raw_request.

We don't need retry policy logic and update_endpoint_manager because our websocket implementwtion doesn't have a middleware client, as opposed to http implementation, which has the reqwest middleware client. Mainly because of how connections are handled in http vs websockets.

As you can see, we don't actually have a client associated in the endpoint manager for ws to update. It simply manages rotation based on weights. Whereas for http it's a bit more complex, where we can configure different retry strategies based on error codes etc.. I hope that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! thanks for the explanation Nami!

"`set_retry_policy` not implemented for WebSocket transport"
))
}

/// Update endpoint manager with a new client
///
/// Note: Not applicable for WebSocket transport
fn update_endpoint_manager_client(
&mut self,
_client: ClientWithMiddleware,
) -> Result<(), anyhow::Error> {
Err(anyhow::anyhow!(
"`update_endpoint_manager_client` not implemented for WebSocket transport"
))
}
}

#[async_trait::async_trait]
impl RotatingTransport for MidnightTransportClient {
/// Tests connection to a specific WebSocket URL
///
/// # Arguments
/// * `url` - The WebSocket URL to test connection with
///
/// # Returns
/// * `Result<(), anyhow::Error>` - Success or error status
async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
self.ws_client.try_connect(url).await
}

/// Updates the client to use a new WebSocket URL
///
/// # Arguments
/// * `url` - The new WebSocket URL to use for subsequent requests
///
/// # Returns
/// * `Result<(), anyhow::Error>` - Success or error status
async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
self.ws_client.update_client(url).await
}
}
31 changes: 24 additions & 7 deletions src/services/blockchain/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,35 @@ mod stellar {
}
mod midnight {
pub mod http;
pub mod ws;
}

mod endpoint_manager;
mod http;
mod ws;
mod http {
pub mod endpoint_manager;
pub mod transport;
}

mod ws {
pub mod config;
pub mod connection;
pub mod endpoint_manager;
pub mod transport;
}

pub use http::{
endpoint_manager::EndpointManager as HttpEndpointManager, transport::HttpTransportClient,
};
pub use ws::{
config::WsConfig, endpoint_manager::EndpointManager as WsEndpointManager,
transport::WsTransportClient,
};

pub use endpoint_manager::EndpointManager;
pub use evm::http::EVMTransportClient;
pub use http::HttpTransportClient;
pub use midnight::http::MidnightTransportClient;
pub use midnight::{
http::MidnightTransportClient as MidnightHttpTransportClient,
ws::MidnightTransportClient as MidnightWsTransportClient,
};
pub use stellar::http::StellarTransportClient;
pub use ws::WsTransportClient;

use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{
Expand Down
Loading