Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
RPC Block Subscription (backport #21787) (#21992)
Browse files Browse the repository at this point in the history
* RPC Block Subscription (#21787)

* add stuff

* compiling

* add notify block

* wip

* feat: add blockSubscribe pubsub method

* address PR comments

Co-authored-by: Lucas B <buffalu@jito.network>
Co-authored-by: Zano <segfaultdoctor@protonmail.com>
(cherry picked from commit 76098dd)

# Conflicts:
#	Cargo.lock
#	client-test/Cargo.toml
#	rpc/src/rpc_subscriptions.rs

* Fix conflicts

Co-authored-by: segfaultdoctor <seg@jito.network>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
  • Loading branch information
3 people authored Dec 18, 2021
1 parent 02cfa85 commit 35ee48b
Show file tree
Hide file tree
Showing 20 changed files with 1,026 additions and 161 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2021"
serde_json = "1.0.72"
serial_test = "0.5.1"
solana-client = { path = "../client", version = "=1.9.2" }
solana-ledger = { path = "../ledger", version = "=1.9.2" }
solana-measure = { path = "../measure", version = "=1.9.2" }
solana-merkle-tree = { path = "../merkle-tree", version = "=1.9.2" }
solana-metrics = { path = "../metrics", version = "=1.9.2" }
Expand All @@ -23,6 +24,7 @@ solana-runtime = { path = "../runtime", version = "=1.9.2" }
solana-sdk = { path = "../sdk", version = "=1.9.2" }
solana-streamer = { path = "../streamer", version = "=1.9.2" }
solana-test-validator = { path = "../test-validator", version = "=1.9.2" }
solana-transaction-status = { path = "../transaction-status", version = "=1.9.2" }
solana-version = { path = "../version", version = "=1.9.2" }
systemstat = "0.1.10"

Expand Down
131 changes: 124 additions & 7 deletions client-test/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ use {
solana_client::{
pubsub_client::PubsubClient,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::SlotInfo,
rpc_config::{
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig,
},
rpc_response::{RpcBlockUpdate, SlotInfo},
},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::create_test_transactions_and_populate_blockstore,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_subscriptions::RpcSubscriptions,
},
Expand All @@ -20,7 +25,7 @@ use {
},
solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
commitment_config::{CommitmentConfig, CommitmentLevel},
native_token::sol_to_lamports,
pubkey::Pubkey,
rpc_port,
Expand All @@ -29,11 +34,12 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
solana_test_validator::TestValidator,
solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::sleep,
Expand Down Expand Up @@ -119,9 +125,10 @@ fn test_account_subscription() {
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();

let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Expand Down Expand Up @@ -194,6 +201,112 @@ fn test_account_subscription() {
assert_eq!(errors, [].to_vec());
}

#[test]
#[serial]
fn test_block_subscription() {
// setup BankForks
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo {
genesis_config,
mint_keypair: alice,
..
} = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));

// setup Blockstore
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);

// populate ledger with test txs
let bank = bank_forks.read().unwrap().working_bank();
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
let _confirmed_block_signatures = create_test_transactions_and_populate_blockstore(
vec![&alice, &keypair1, &keypair2, &keypair3],
0,
bank,
blockstore.clone(),
max_complete_transaction_status_slot,
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
// setup RpcSubscriptions && PubSubService
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
max_complete_transaction_status_slot,
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
));
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let pub_cfg = PubSubConfig {
enable_block_subscription: true,
..PubSubConfig::default()
};
let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr);

std::thread::sleep(Duration::from_millis(400));

// setup PubsubClient
let (mut client, receiver) = PubsubClient::block_subscribe(
&format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}),
encoding: Some(UiTransactionEncoding::Json),
transaction_details: Some(TransactionDetails::Signatures),
show_rewards: None,
}),
)
.unwrap();

// trigger Gossip notification
let slot = bank_forks.read().unwrap().highest_slot();
subscriptions.notify_gossip_subscribers(slot);
let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
match maybe_actual {
Ok(actual) => {
let complete_block = blockstore.get_complete_block(slot, false).unwrap();
let block = complete_block.clone().configure(
UiTransactionEncoding::Json,
TransactionDetails::Signatures,
false,
);
let expected = RpcBlockUpdate {
slot,
block: Some(block),
err: None,
};
let block = complete_block.configure(
UiTransactionEncoding::Json,
TransactionDetails::Signatures,
false,
);
assert_eq!(actual.value.slot, expected.slot);
assert!(block.eq(&actual.value.block.unwrap()));
}
Err(e) => {
eprintln!("unexpected websocket receive timeout");
assert_eq!(Some(e), None);
}
}

// cleanup
exit.store(true, Ordering::Relaxed);
trigger.cancel();
client.shutdown().unwrap();
pubsub_service.close().unwrap();
}

#[test]
#[serial]
fn test_program_subscription() {
Expand All @@ -215,9 +328,10 @@ fn test_program_subscription() {
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();

let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Expand Down Expand Up @@ -300,9 +414,10 @@ fn test_root_subscription() {
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);

let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Expand Down Expand Up @@ -350,8 +465,10 @@ fn test_slot_subscription() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
Expand Down
54 changes: 50 additions & 4 deletions client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use {
crate::{
rpc_config::{
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig,
RpcTransactionLogsConfig, RpcTransactionLogsFilter,
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::{
Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult,
SlotInfo, SlotUpdate,
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, SlotInfo, SlotUpdate,
},
},
log::*,
Expand Down Expand Up @@ -173,6 +174,12 @@ pub type SignatureSubscription = (
Receiver<RpcResponse<RpcSignatureResult>>,
);

pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
pub type BlockSubscription = (
PubsubBlockClientSubscription,
Receiver<RpcResponse<RpcBlockUpdate>>,
);

pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
pub type ProgramSubscription = (
PubsubProgramClientSubscription,
Expand Down Expand Up @@ -266,6 +273,45 @@ impl PubsubClient {
Ok((result, receiver))
}

pub fn block_subscribe(
url: &str,
filter: RpcBlockSubscribeFilter,
config: Option<RpcBlockSubscribeConfig>,
) -> Result<BlockSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"blockSubscribe",
"params":[filter, config]
})
.to_string();

let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;

let t_cleanup = std::thread::spawn(move || {
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});

let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "blocks",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};

Ok((result, receiver))
}

pub fn logs_subscribe(
url: &str,
filter: RpcTransactionLogsFilter,
Expand Down
17 changes: 17 additions & 0 deletions client/src/rpc_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,23 @@ pub struct RpcSignatureSubscribeConfig {
pub enable_received_notification: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RpcBlockSubscribeFilter {
All,
MentionsAccountOrProgram(String),
}

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcBlockSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub encoding: Option<UiTransactionEncoding>,
pub transaction_details: Option<TransactionDetails>,
pub show_rewards: Option<bool>,
}

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcSignaturesForAddressConfig {
Expand Down
17 changes: 16 additions & 1 deletion client/src/rpc_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use {
transaction::{Result, TransactionError},
},
solana_transaction_status::{
ConfirmedTransactionStatusWithSignature, TransactionConfirmationStatus,
ConfirmedTransactionStatusWithSignature, TransactionConfirmationStatus, UiConfirmedBlock,
},
std::{collections::HashMap, fmt, net::SocketAddr},
thiserror::Error,
};

pub type RpcResult<T> = client_error::Result<Response<T>>;
Expand Down Expand Up @@ -424,6 +425,20 @@ pub struct RpcInflationReward {
pub commission: Option<u8>, // Vote account commission when the reward was credited
}

#[derive(Clone, Deserialize, Serialize, Debug, Error, Eq, PartialEq)]
pub enum RpcBlockUpdateError {
#[error("block store error")]
BlockStoreError,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcBlockUpdate {
pub slot: Slot,
pub block: Option<UiConfirmedBlock>,
pub err: Option<RpcBlockUpdateError>,
}

impl From<ConfirmedTransactionStatusWithSignature> for RpcConfirmedTransactionStatusWithSignature {
fn from(value: ConfirmedTransactionStatusWithSignature) -> Self {
let ConfirmedTransactionStatusWithSignature {
Expand Down
Loading

0 comments on commit 35ee48b

Please sign in to comment.