diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index b79e91f681b97f..44663b3372cb2c 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -183,10 +183,9 @@ use { RpcTransactionLogsFilter, }, error_object::RpcErrorObject, - filter::maybe_map_filters, response::{ Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, - RpcSignatureResult, RpcVersionInfo, RpcVote, SlotInfo, SlotUpdate, + RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, @@ -194,7 +193,7 @@ use { thiserror::Error, tokio::{ net::TcpStream, - sync::{mpsc, oneshot, RwLock}, + sync::{mpsc, oneshot}, task::JoinHandle, time::{sleep, Duration}, }, @@ -265,9 +264,8 @@ type RequestMsg = ( #[derive(Debug)] pub struct PubsubClient { subscribe_sender: mpsc::UnboundedSender, - request_sender: mpsc::UnboundedSender, + _request_sender: mpsc::UnboundedSender, shutdown_sender: oneshot::Sender<()>, - node_version: RwLock>, ws: JoinHandle, } @@ -279,14 +277,14 @@ impl PubsubClient { .map_err(PubsubClientError::ConnectionError)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); - let (request_sender, request_receiver) = mpsc::unbounded_channel(); + let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + #[allow(clippy::used_underscore_binding)] Ok(Self { subscribe_sender, - request_sender, + _request_sender, shutdown_sender, - node_version: RwLock::new(None), ws: tokio::spawn(PubsubClient::run_ws( ws, subscribe_receiver, @@ -301,43 +299,11 @@ impl PubsubClient { self.ws.await.unwrap() // WS future should not be cancelled or panicked } - pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> { - let mut w_node_version = self.node_version.write().await; - *w_node_version = Some(version); + #[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")] + pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> { Ok(()) } - async fn get_node_version(&self) -> PubsubClientResult { - let r_node_version = self.node_version.read().await; - if let Some(version) = &*r_node_version { - Ok(version.clone()) - } else { - drop(r_node_version); - let mut w_node_version = self.node_version.write().await; - let node_version = self.get_version().await?; - *w_node_version = Some(node_version.clone()); - Ok(node_version) - } - } - - async fn get_version(&self) -> PubsubClientResult { - let (response_sender, response_receiver) = oneshot::channel(); - self.request_sender - .send(("getVersion".to_string(), Value::Null, response_sender)) - .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?; - let result = response_receiver - .await - .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??; - let node_version: RpcVersionInfo = serde_json::from_value(result)?; - let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| { - PubsubClientError::RequestFailed { - reason: format!("failed to parse cluster version: {e}"), - message: "getVersion".to_string(), - } - })?; - Ok(node_version) - } - async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T> where T: DeserializeOwned + Send + 'a, @@ -426,22 +392,8 @@ impl PubsubClient { pub async fn program_subscribe( &self, pubkey: &Pubkey, - mut config: Option, + config: Option, ) -> SubscribeResult<'_, RpcResponse> { - if let Some(ref mut config) = config { - if let Some(ref mut filters) = config.filters { - let node_version = self.get_node_version().await.ok(); - // If node does not support the pubsub `getVersion` method, assume version is old - // and filters should be mapped (node_version.is_none()). - maybe_map_filters(node_version, filters).map_err(|e| { - PubsubClientError::RequestFailed { - reason: e, - message: "maybe_map_filters".to_string(), - } - })?; - } - } - let params = json!([pubkey.to_string(), config]); self.subscribe("program", params).await } diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 70769619db1f4d..5247bdb8b9e263 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -103,7 +103,6 @@ use { RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, - filter, response::{ Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, @@ -207,35 +206,6 @@ where .map_err(|err| err.into()) } - fn get_version( - writable_socket: &Arc>>>, - ) -> Result { - writable_socket.write().unwrap().send(Message::Text( - json!({ - "jsonrpc":"2.0","id":1,"method":"getVersion", - }) - .to_string(), - ))?; - let message = writable_socket.write().unwrap().read()?; - let message_text = &message.into_text()?; - - if let Ok(json_msg) = serde_json::from_str::>(message_text) { - if let Some(Object(version_map)) = json_msg.get("result") { - if let Some(node_version) = version_map.get("solana-core") { - if let Some(node_version) = node_version.as_str() { - if let Ok(parsed) = semver::Version::parse(node_version) { - return Ok(parsed); - } - } - } - } - } - - Err(PubsubClientError::UnexpectedGetVersionResponse(format!( - "msg={message_text}" - ))) - } - fn read_message( writable_socket: &Arc>>>, ) -> Result, PubsubClientError> { @@ -523,7 +493,7 @@ impl PubsubClient { pub fn program_subscribe( url: &str, pubkey: &Pubkey, - mut config: Option, + config: Option, ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; @@ -534,16 +504,6 @@ impl PubsubClient { let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); - if let Some(ref mut config) = config { - if let Some(ref mut filters) = config.filters { - let node_version = PubsubProgramClientSubscription::get_version(&socket_clone).ok(); - // If node does not support the pubsub `getVersion` method, assume version is old - // and filters should be mapped (node_version.is_none()). - filter::maybe_map_filters(node_version, filters) - .map_err(PubsubClientError::RequestError)?; - } - } - let body = json!({ "jsonrpc":"2.0", "id":1, diff --git a/rpc-client-api/src/filter.rs b/rpc-client-api/src/filter.rs index 9af8cc0784c1ff..bc94da2938b8e8 100644 --- a/rpc-client-api/src/filter.rs +++ b/rpc-client-api/src/filter.rs @@ -1,6 +1,5 @@ #![allow(deprecated)] use { - crate::version_req::VersionReq, solana_inline_spl::{token::GenericTokenAccount, token_2022::Account}, solana_sdk::account::{AccountSharedData, ReadableAccount}, std::borrow::Cow, @@ -298,34 +297,6 @@ impl From for Memcmp { } } -pub fn maybe_map_filters( - node_version: Option, - filters: &mut [RpcFilterType], -) -> Result<(), String> { - let version_reqs = VersionReq::from_strs(&["<1.11.2", "~1.13"])?; - let needs_mapping = node_version - .map(|version| version_reqs.matches_any(&version)) - .unwrap_or(true); - if needs_mapping { - for filter in filters.iter_mut() { - if let RpcFilterType::Memcmp(memcmp) = filter { - match &memcmp.bytes { - MemcmpEncodedBytes::Base58(string) => { - memcmp.bytes = MemcmpEncodedBytes::Binary(string.clone()); - } - MemcmpEncodedBytes::Base64(_) => { - return Err("RPC node on old version does not support base64 \ - encoding for memcmp filters" - .to_string()); - } - _ => {} - } - } - } - } - Ok(()) -} - #[cfg(test)] mod tests { use super::*; diff --git a/rpc-client-api/src/lib.rs b/rpc-client-api/src/lib.rs index 9615efe24ba3a2..b2484637766ce7 100644 --- a/rpc-client-api/src/lib.rs +++ b/rpc-client-api/src/lib.rs @@ -7,7 +7,6 @@ pub mod error_object; pub mod filter; pub mod request; pub mod response; -pub mod version_req; #[macro_use] extern crate serde_derive; diff --git a/rpc-client-api/src/version_req.rs b/rpc-client-api/src/version_req.rs deleted file mode 100644 index 8c8d57e35c2610..00000000000000 --- a/rpc-client-api/src/version_req.rs +++ /dev/null @@ -1,20 +0,0 @@ -pub(crate) struct VersionReq(Vec); - -impl VersionReq { - pub(crate) fn from_strs(versions: &[T]) -> Result - where - T: AsRef + std::fmt::Debug, - { - let mut version_reqs = vec![]; - for version in versions { - let version_req = semver::VersionReq::parse(version.as_ref()) - .map_err(|err| format!("Could not parse version {version:?}: {err:?}"))?; - version_reqs.push(version_req); - } - Ok(Self(version_reqs)) - } - - pub(crate) fn matches_any(&self, version: &semver::Version) -> bool { - self.0.iter().any(|r| r.matches(version)) - } -} diff --git a/rpc-client/src/nonblocking/rpc_client.rs b/rpc-client/src/nonblocking/rpc_client.rs index a7c5359d196ccc..0ca5f76a49f829 100644 --- a/rpc-client/src/nonblocking/rpc_client.rs +++ b/rpc-client/src/nonblocking/rpc_client.rs @@ -32,7 +32,6 @@ use { Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult, }, config::{RpcAccountInfoConfig, *}, - filter::{self, RpcFilterType}, request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter}, response::*, }, @@ -57,7 +56,7 @@ use { str::FromStr, time::{Duration, Instant}, }, - tokio::{sync::RwLock, time::sleep}, + tokio::time::sleep, }; /// A client of a remote Solana node. @@ -141,7 +140,6 @@ use { pub struct RpcClient { sender: Box, config: RpcClientConfig, - node_version: RwLock>, } impl RpcClient { @@ -157,7 +155,6 @@ impl RpcClient { ) -> Self { Self { sender: Box::new(sender), - node_version: RwLock::new(None), config, } } @@ -509,30 +506,11 @@ impl RpcClient { self.sender.url() } - pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> { - let mut w_node_version = self.node_version.write().await; - *w_node_version = Some(version); + #[deprecated(since = "2.0.2", note = "RpcClient::node_version is no longer used")] + pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> { Ok(()) } - async fn get_node_version(&self) -> Result { - let r_node_version = self.node_version.read().await; - if let Some(version) = &*r_node_version { - Ok(version.clone()) - } else { - drop(r_node_version); - let mut w_node_version = self.node_version.write().await; - let node_version = self.get_version().await.map_err(|e| { - RpcError::RpcRequestError(format!("cluster version query failed: {e}")) - })?; - let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| { - RpcError::RpcRequestError(format!("failed to parse cluster version: {e}")) - })?; - *w_node_version = Some(node_version.clone()); - Ok(node_version) - } - } - /// Get the configured default [commitment level][cl]. /// /// [cl]: https://solana.com/docs/rpc#configuring-state-commitment @@ -550,17 +528,6 @@ impl RpcClient { self.config.commitment_config } - #[allow(deprecated)] - async fn maybe_map_filters( - &self, - mut filters: Vec, - ) -> Result, RpcError> { - let node_version = self.get_node_version().await?; - filter::maybe_map_filters(Some(node_version), &mut filters) - .map_err(RpcError::RpcRequestError)?; - Ok(filters) - } - /// Submit a transaction and wait for confirmation. /// /// Once this function returns successfully, the given transaction is @@ -895,11 +862,7 @@ impl RpcClient { transaction: &impl SerializableTransaction, config: RpcSendTransactionConfig, ) -> ClientResult { - let encoding = if let Some(encoding) = config.encoding { - encoding - } else { - self.default_cluster_transaction_encoding().await? - }; + let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64); let preflight_commitment = CommitmentConfig { commitment: config.preflight_commitment.unwrap_or_default(), }; @@ -1185,16 +1148,6 @@ impl RpcClient { } } - async fn default_cluster_transaction_encoding( - &self, - ) -> Result { - if self.get_node_version().await? < semver::Version::new(1, 3, 16) { - Ok(UiTransactionEncoding::Base58) - } else { - Ok(UiTransactionEncoding::Base64) - } - } - /// Simulates sending a transaction. /// /// If the transaction fails, then the [`err`] field of the returned @@ -1344,11 +1297,7 @@ impl RpcClient { transaction: &impl SerializableTransaction, config: RpcSimulateTransactionConfig, ) -> RpcResult { - let encoding = if let Some(encoding) = config.encoding { - encoding - } else { - self.default_cluster_transaction_encoding().await? - }; + let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64); let commitment = config.commitment.unwrap_or_default(); let config = RpcSimulateTransactionConfig { encoding: Some(encoding), @@ -4046,9 +3995,6 @@ impl RpcClient { .commitment .unwrap_or_else(|| self.commitment()); config.account_config.commitment = Some(commitment); - if let Some(filters) = config.filters { - config.filters = Some(self.maybe_map_filters(filters).await?); - } let accounts = self .send::>>(