Skip to content

Commit

Permalink
add ws socket address in config and e2e test for event (#2597)
Browse files Browse the repository at this point in the history
* add ws socket in config and e2e test

* rebase & comments
  • Loading branch information
longbowlu authored and Home committed Jun 30, 2022
1 parent e12dc66 commit 94ad1a0
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 34 deletions.
1 change: 1 addition & 0 deletions crates/sui-config/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl<R: ::rand::RngCore + ::rand::CryptoRng> ConfigBuilder<R> {
network_address,
metrics_address: utils::available_local_socket_address(),
json_rpc_address: utils::available_local_socket_address(),
websocket_address: None,
consensus_config: Some(consensus_config),
enable_event_processing: false,
enable_gossip: true,
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct NodeConfig {
pub metrics_address: SocketAddr,
#[serde(default = "default_json_rpc_address")]
pub json_rpc_address: SocketAddr,
#[serde(default = "default_websocket_address")]
pub websocket_address: Option<SocketAddr>,

#[serde(skip_serializing_if = "Option::is_none")]
pub consensus_config: Option<ConsensusConfig>,
Expand Down Expand Up @@ -61,6 +63,11 @@ pub fn default_json_rpc_address() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9000)
}

pub fn default_websocket_address() -> Option<SocketAddr> {
use std::net::{IpAddr, Ipv4Addr};
Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9001))
}

impl Config for NodeConfig {}

impl NodeConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-config/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl NetworkConfig {
network_address: utils::new_network_address(),
metrics_address: utils::available_local_socket_address(),
json_rpc_address: utils::available_local_socket_address(),
websocket_address: Some(utils::available_local_socket_address()),
consensus_config: None,
enable_event_processing: true,
enable_gossip: true,
Expand Down
19 changes: 11 additions & 8 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,28 +170,31 @@ impl SuiNode {
tokio::spawn(server.serve().map_err(Into::into))
};

let (json_rpc_service, ws_subscription_service) = if config.consensus_config().is_some() {
(None, None)
let json_rpc_service = if config.consensus_config().is_some() {
None
} else {
let mut server = JsonRpcServerBuilder::new()?;
server.register_module(ReadApi::new(state.clone()))?;
server.register_module(FullNodeApi::new(state.clone()))?;
server.register_module(BcsApiImpl::new(state.clone()))?;

let server_handle = server.start(config.json_rpc_address).await?;
Some(server_handle)
};

let ws_handle = if let Some(event_handler) = state.event_handler.clone() {
let ws_server = WsServerBuilder::default().build("127.0.0.1:0").await?;
// TODO: we will change the conditions soon when we introduce txn subs
let ws_subscription_service = match (config.websocket_address, state.event_handler.clone())
{
(Some(ws_addr), Some(event_handler)) => {
let ws_server = WsServerBuilder::default().build(ws_addr).await?;
let server_addr = ws_server.local_addr()?;
let ws_handle =
ws_server.start(EventApiImpl::new(state.clone(), event_handler).into_rpc())?;

info!("Starting WS endpoint at ws://{}", server_addr);
Some(ws_handle)
} else {
None
};
(Some(server_handle), ws_handle)
}
_ => None,
};

let node = Self {
Expand Down
1 change: 1 addition & 0 deletions crates/sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl SuiCommand {

let mut fullnode_config = network_config.generate_fullnode_config();
fullnode_config.json_rpc_address = sui_config::node::default_json_rpc_address();
fullnode_config.websocket_address = sui_config::node::default_websocket_address();
fullnode_config.save(sui_config_dir.join(SUI_FULLNODE_CONFIG))?;

for (i, validator) in network_config
Expand Down
146 changes: 120 additions & 26 deletions crates/sui/tests/full_node_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@ use futures::{future, StreamExt};
use serde_json::json;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use sui::wallet_commands::{WalletCommandResult, WalletCommands, WalletContext};
use sui_core::authority::AuthorityState;
use sui_json::SuiJsonValue;
use sui_json_rpc_api::rpc_types::{SplitCoinResponse, TransactionResponse};
use sui_node::SuiNode;

use jsonrpsee::core::client::{Client, Subscription, SubscriptionClientT};
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use move_package::BuildConfig;
use serde_json::Value;
use std::net::SocketAddr;
use std::{collections::BTreeMap, sync::Arc};
use sui_json_rpc_api::rpc_types::{
SuiEvent, SuiMoveStruct, SuiMoveValue, SuiObjectInfo, SuiObjectRead,
};
use sui_swarm::memory::Swarm;
use sui_types::{
base_types::{ObjectID, ObjectRef, SuiAddress, TransactionDigest},
batch::UpdateItem,
messages::{BatchInfoRequest, BatchInfoResponseItem, Transaction},
};
use test_utils::network::setup_network_and_wallet;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tokio::time::{sleep, Duration};
use tracing::info;

Expand Down Expand Up @@ -55,6 +65,42 @@ async fn transfer_coin(
Ok((object_to_send, sender, receiver, digest))
}

async fn get_account_and_objects(
context: &mut WalletContext,
) -> Result<(SuiAddress, Vec<SuiObjectInfo>), anyhow::Error> {
let sender = context.config.accounts.get(0).cloned().unwrap();
let object_refs = context.gateway.get_objects_owned_by_address(sender).await?;
Ok((sender, object_refs))
}

async fn emit_move_events(
context: &mut WalletContext,
) -> Result<(SuiAddress, ObjectID, TransactionDigest), anyhow::Error> {
let (sender, object_refs) = get_account_and_objects(context).await.unwrap();
let gas_object = object_refs.get(0).unwrap().object_id;

let res = WalletCommands::CreateExampleNFT {
name: Some("example_nft_name".into()),
description: Some("example_nft_desc".into()),
url: Some("https://sui.io/_nuxt/img/sui-logo.8d3c44e.svg".into()),
gas: Some(gas_object),
gas_budget: Some(50000),
}
.execute(context)
.await?;

let (object_id, digest) = if let WalletCommandResult::CreateExampleNFT(SuiObjectRead::Exists(
obj,
)) = res
{
(obj.reference.object_id, obj.previous_transaction)
} else {
panic!("CreateExampleNFT command did not return WalletCommandResult::CreateExampleNFT(SuiObjectRead::Exists, got {:?}", res);
};

Ok((sender, object_id, digest))
}

async fn wait_for_tx(wait_digest: TransactionDigest, state: Arc<AuthorityState>) {
wait_for_all_txes(vec![wait_digest], state).await
}
Expand Down Expand Up @@ -347,31 +393,6 @@ async fn test_full_node_indexes() -> Result<(), anyhow::Error> {
Ok(())
}

// Test for syncing a node to an authority that already has many txes.
#[tokio::test]
async fn test_full_node_cold_sync() -> Result<(), anyhow::Error> {
telemetry_subscribers::init_for_testing();

let (swarm, mut context, _) = setup_network_and_wallet().await?;

let (_, _, _, _) = transfer_coin(&mut context).await?;
let (_, _, _, _) = transfer_coin(&mut context).await?;
let (_, _, _, _) = transfer_coin(&mut context).await?;
let (_transfered_object, sender, _receiver, digest) = transfer_coin(&mut context).await?;

sleep(Duration::from_millis(1000)).await;

let config = swarm.config().generate_fullnode_config();
let node = SuiNode::start(&config).await?;

wait_for_tx(digest, node.state().clone()).await;

let txes = node.state().get_transactions_from_addr(sender).await?;
assert_eq!(txes.last().unwrap().1, digest);

Ok(())
}

#[tokio::test]
async fn test_full_node_sync_flood() -> Result<(), anyhow::Error> {
telemetry_subscribers::init_for_testing();
Expand Down Expand Up @@ -463,3 +484,76 @@ async fn test_full_node_sync_flood() -> Result<(), anyhow::Error> {

Ok(())
}

/// Call this function to set up a network and a fullnode with subscription enabled.
/// Pass in an unique port for each test case otherwise they may interfere with one another.
async fn set_up_subscription(port: u16, swarm: &Swarm) -> Result<(SuiNode, Client), anyhow::Error> {
let ws_server_url = format!("127.0.0.1:{}", port);
let ws_addr: SocketAddr = ws_server_url.parse().unwrap();

let mut config = swarm.config().generate_fullnode_config();
config.websocket_address = Some(ws_addr);

let node = SuiNode::start(&config).await?;

let client = WsClientBuilder::default()
.build(&format!("ws://{}", ws_server_url))
.await?;
Ok((node, client))
}

#[tokio::test]
async fn test_full_node_sub_to_move_event_ok() -> Result<(), anyhow::Error> {
let (swarm, mut context, _) = setup_network_and_wallet().await?;
// Pass in an unique port for each test case otherwise they may interfere with one another.
let (node, ws_client) = set_up_subscription(6666, &swarm).await?;

let params = BTreeMap::<String, Value>::new();
let mut sub: Subscription<SuiEvent> = ws_client
.subscribe(
"sui_subscribeMoveEventsByType",
rpc_params!["0x2::devnet_nft::MintNFTEvent", params],
"sui_unsubscribeMoveEventsByType",
)
.await
.unwrap();

let (sender, object_id, digest) = emit_move_events(&mut context).await?;
wait_for_tx(digest, node.state().clone()).await;

match timeout(Duration::from_secs(5), sub.next()).await {
Ok(Some(Ok(SuiEvent::MoveEvent {
type_,
fields,
bcs: _,
}))) => {
assert_eq!(type_, "0x2::devnet_nft::MintNFTEvent");
assert_eq!(
fields,
SuiMoveStruct::WithFields(BTreeMap::from([
("creator".into(), SuiMoveValue::Address(sender)),
(
"name".into(),
SuiMoveValue::String("example_nft_name".into())
),
(
"object_id".into(),
SuiMoveValue::Address(SuiAddress::from(object_id))
),
]))
);
// TODO: verify bcs contents
}
other => panic!("Failed to get SuiEvent, but {:?}", other),
}

match timeout(Duration::from_secs(5), sub.next()).await {
Err(_) => (),
other => panic!(
"Expect to time out because no new events are coming in. Got {:?}",
other
),
}

Ok(())
}

0 comments on commit 94ad1a0

Please sign in to comment.