Skip to content

Commit

Permalink
[graphql/rpc] add graphql and indexer reader to sui test validator (M…
Browse files Browse the repository at this point in the history
oxade authored Nov 15, 2023
1 parent 6b478e5 commit 6afebbe
Showing 8 changed files with 172 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -429,6 +429,7 @@ jobs:
run: |
cargo test --package sui-graphql-rpc --test e2e_tests --test examples_validation_tests --features pg_integration
cargo test --package sui-graphql-e2e-tests --features pg_integration
cargo test --package sui-cluster-test --test local_cluster_test --features pg_integration
env:
POSTGRES_HOST: localhost
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/sui-cluster-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ shared-crypto.workspace = true

sui-indexer.workspace = true
sui-faucet.workspace = true
sui-graphql-rpc.workspace = true
sui-swarm.workspace = true
sui.workspace = true
sui-swarm-config.workspace = true
@@ -40,3 +41,6 @@ telemetry-subscribers.workspace = true
test-cluster.workspace = true
move-core-types.workspace = true
workspace-hack.workspace = true

[features]
pg_integration = []
65 changes: 50 additions & 15 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ use std::net::SocketAddr;
use std::path::Path;
use sui_config::Config;
use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG};
use sui_graphql_rpc::config::ConnectionConfig;
use sui_graphql_rpc::test_infra::cluster::{start_graphql_server, start_test_indexer_v2};
use sui_indexer::test_utils::start_test_indexer;
use sui_indexer::IndexerConfig;
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
@@ -218,22 +220,55 @@ impl Cluster for LocalNewCluster {
// This cluster has fullnode handle, safe to unwrap
let fullnode_url = test_cluster.fullnode_handle.rpc_url.clone();

let migrated_methods = if options.use_indexer_experimental_methods {
IndexerConfig::all_implemented_methods()
} else {
vec![]
};
if options.pg_address.is_some() && indexer_address.is_some() {
let config = IndexerConfig {
db_url: Some(options.pg_address.clone().unwrap()),
rpc_client_url: fullnode_url.clone(),
rpc_server_url: indexer_address.as_ref().unwrap().ip().to_string(),
rpc_server_port: indexer_address.as_ref().unwrap().port(),
migrated_methods,
reset_db: true,
..Default::default()
};
start_test_indexer(config).await.unwrap();
if options.use_indexer_v2 {
// Start in writer mode
start_test_indexer_v2(
Some(options.pg_address.clone().unwrap()),
fullnode_url.clone(),
None,
options.use_indexer_experimental_methods,
)
.await;

// Start in reader mode
start_test_indexer_v2(
Some(options.pg_address.clone().unwrap()),
fullnode_url.clone(),
indexer_address.map(|x| x.to_string()),
options.use_indexer_experimental_methods,
)
.await;
} else {
let migrated_methods = if options.use_indexer_experimental_methods {
IndexerConfig::all_implemented_methods()
} else {
vec![]
};
let config = IndexerConfig {
db_url: Some(options.pg_address.clone().unwrap()),
rpc_client_url: fullnode_url.clone(),
rpc_server_url: indexer_address.as_ref().unwrap().ip().to_string(),
rpc_server_port: indexer_address.as_ref().unwrap().port(),
migrated_methods,
reset_db: true,
..Default::default()
};
start_test_indexer(config).await.unwrap();
}
}

if let Some(graphql_address) = &options.graphql_address {
let graphql_address = graphql_address.parse::<SocketAddr>().unwrap();
let graphql_connection_config = ConnectionConfig::new(
Some(graphql_address.port()),
Some(graphql_address.ip().to_string()),
options.pg_address.clone(),
None,
None,
);

start_graphql_server(graphql_connection_config.clone()).await;
}

// Let nodes connect to one another
12 changes: 10 additions & 2 deletions crates/sui-cluster-test/src/config.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use std::path::PathBuf;
// SPDX-License-Identifier: Apache-2.0
use clap::*;

#[derive(Parser, Clone, ValueEnum)]
#[derive(Parser, Clone, ValueEnum, Debug)]
pub enum Env {
Devnet,
Staging,
@@ -15,7 +15,7 @@ pub enum Env {
NewLocal,
}

#[derive(Parser)]
#[derive(Parser, Debug)]
#[clap(name = "", rename_all = "kebab-case")]
pub struct ClusterTestOpt {
#[clap(value_enum)]
@@ -29,6 +29,9 @@ pub struct ClusterTestOpt {
/// URL for the indexer RPC server
#[clap(long)]
pub indexer_address: Option<String>,
/// Use new version of indexer or not
#[clap(long)]
pub use_indexer_v2: bool,
/// URL for the Indexer Postgres DB
#[clap(long)]
pub pg_address: Option<String>,
@@ -37,6 +40,9 @@ pub struct ClusterTestOpt {
pub use_indexer_experimental_methods: bool,
#[clap(long)]
pub config_dir: Option<PathBuf>,
/// URL for the indexer RPC server
#[clap(long)]
pub graphql_address: Option<String>,
}

impl ClusterTestOpt {
@@ -50,6 +56,8 @@ impl ClusterTestOpt {
pg_address: None,
use_indexer_experimental_methods: false,
config_dir: None,
graphql_address: None,
use_indexer_v2: false,
}
}
}
52 changes: 52 additions & 0 deletions crates/sui-cluster-test/tests/local_cluster_test.rs
Original file line number Diff line number Diff line change
@@ -9,3 +9,55 @@ async fn cluster_test() {

ClusterTest::run(ClusterTestOpt::new_local()).await;
}

#[cfg(feature = "pg_integration")]
#[tokio::test]
async fn test_sui_cluster() {
use reqwest::StatusCode;
use sui_cluster_test::cluster::Cluster;
use sui_cluster_test::cluster::LocalNewCluster;
use sui_cluster_test::config::Env;
use sui_graphql_rpc::client::simple_client::SimpleClient;
use tokio::time::sleep;
let fullnode_rpc_port: u16 = 9020;
let indexer_rpc_port: u16 = 9124;
let pg_address = "postgres://postgres:postgrespw@localhost:5432/sui_indexer_v2".to_string();
let graphql_address = format!("127.0.0.1:{}", 8000);

let opts = ClusterTestOpt {
env: Env::NewLocal,
faucet_address: None,
fullnode_address: Some(format!("127.0.0.1:{}", fullnode_rpc_port)),
epoch_duration_ms: Some(60000),
indexer_address: Some(format!("127.0.0.1:{}", indexer_rpc_port)),
pg_address: Some(pg_address),
use_indexer_experimental_methods: false,
config_dir: None,
graphql_address: Some(graphql_address),
use_indexer_v2: true,
};

let _cluster = LocalNewCluster::start(&opts).await.unwrap();

let grphql_url: String = format!("http://127.0.0.1:{}", 8000);

sleep(std::time::Duration::from_secs(20)).await;

// Try JSON RPC URL
let query = r#"
{
checkpoint {
sequenceNumber
}
}
"#;
let resp = SimpleClient::new(grphql_url)
.execute_to_graphql(query.to_string(), true, vec![], vec![])
.await
.unwrap();

assert!(resp.errors().is_empty());
assert!(resp.http_status() == StatusCode::OK);
let resp_body = resp.response_body().data.clone().into_json().unwrap();
assert!(resp_body.get("checkpoint").is_some());
}
38 changes: 31 additions & 7 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ pub async fn start_cluster(

// Starts indexer
let (pg_store, pg_handle) =
start_test_indexer(Some(db_url), val_fn.rpc_url().to_string()).await;
start_test_indexer_v2(Some(db_url), val_fn.rpc_url().to_string(), None, true).await;

// Starts graphql server
let graphql_server_handle = start_graphql_server(graphql_connection_config.clone()).await;
@@ -97,8 +97,13 @@ pub async fn serve_executor(
});

// Starts indexer
let (pg_store, pg_handle) =
start_test_indexer(Some(db_url), format!("http://{}", executor_server_url)).await;
let (pg_store, pg_handle) = start_test_indexer_v2(
Some(db_url),
format!("http://{}", executor_server_url),
None,
true,
)
.await;

// Starts graphql server
let graphql_server_handle = start_graphql_server(graphql_connection_config.clone()).await;
@@ -121,7 +126,7 @@ pub async fn serve_executor(
}
}

async fn start_graphql_server(graphql_connection_config: ConnectionConfig) -> JoinHandle<()> {
pub async fn start_graphql_server(graphql_connection_config: ConnectionConfig) -> JoinHandle<()> {
let server_config = ServerConfig {
connection: graphql_connection_config,
..ServerConfig::default()
@@ -152,9 +157,11 @@ async fn start_validator_with_fullnode(internal_data_source_rpc_port: Option<u16
test_cluster_builder.build().await
}

pub async fn start_test_indexer(
pub async fn start_test_indexer_v2(
db_url: Option<String>,
rpc_url: String,
reader_mode_rpc_url: Option<String>,
use_indexer_experimental_methods: bool,
) -> (PgIndexerStoreV2, JoinHandle<Result<(), IndexerError>>) {
let db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
@@ -163,17 +170,34 @@ pub async fn start_test_indexer(
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

let config = IndexerConfig {
let migrated_methods = if use_indexer_experimental_methods {
IndexerConfig::all_implemented_methods()
} else {
vec![]
};

// Default weiter mode
let mut config = IndexerConfig {
db_url: Some(db_url.clone()),
rpc_client_url: rpc_url,
migrated_methods: IndexerConfig::all_implemented_methods(),
migrated_methods,
reset_db: true,
fullnode_sync_worker: true,
rpc_server_worker: false,
use_v2: true,
..Default::default()
};

if let Some(reader_mode_rpc_url) = reader_mode_rpc_url {
let reader_mode_rpc_url = reader_mode_rpc_url
.parse::<SocketAddr>()
.expect("Unable to parse fullnode address");
config.fullnode_sync_worker = false;
config.rpc_server_worker = true;
config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
config.rpc_server_port = reader_mode_rpc_url.port();
}

let parsed_url = config.get_db_url().unwrap();
let blocking_pool = new_pg_connection_pool_impl(&parsed_url, Some(5)).unwrap();
if config.reset_db {
24 changes: 23 additions & 1 deletion crates/sui-test-validator/src/main.rs
Original file line number Diff line number Diff line change
@@ -42,6 +42,14 @@ struct Args {
#[clap(long, default_value = "9123")]
faucet_port: u16,

/// Host to start the GraphQl server on
#[clap(long, default_value = "127.0.0.1")]
graphql_host: String,

/// Port to start the GraphQl server on
#[clap(long, default_value = "8000")]
graphql_port: u16,

/// Port to start the Indexer RPC server on
#[clap(long, default_value = "9124")]
indexer_rpc_port: u16,
@@ -55,6 +63,10 @@ struct Args {
#[clap(long, default_value = "localhost")]
pg_host: String,

/// DB name for the Indexer Postgres DB
#[clap(long, default_value = "sui_indexer")]
pg_db_name: String,

/// The duration for epochs (defaults to one minute)
#[clap(long, default_value = "60000")]
epoch_duration_ms: u64,
@@ -66,6 +78,10 @@ struct Args {
/// TODO(gegao): remove this after indexer migration is complete.
#[clap(long)]
pub use_indexer_experimental_methods: bool,

/// If we should use the new version of the indexer
#[clap(long)]
pub use_indexer_v2: bool,
}

#[tokio::main]
@@ -78,13 +94,17 @@ async fn main() -> Result<()> {
let Args {
config_dir,
fullnode_rpc_port,
graphql_host,
graphql_port,
indexer_rpc_port,
pg_port,
pg_host,
pg_db_name,
epoch_duration_ms,
faucet_port,
with_indexer,
use_indexer_experimental_methods,
use_indexer_v2,
} = args;

// We don't pass epoch duration if we have a genesis config.
@@ -99,12 +119,14 @@ async fn main() -> Result<()> {
fullnode_address: Some(format!("127.0.0.1:{}", fullnode_rpc_port)),
indexer_address: with_indexer.then_some(format!("127.0.0.1:{}", indexer_rpc_port)),
pg_address: with_indexer.then_some(format!(
"postgres://postgres@{pg_host}:{pg_port}/sui_indexer"
"postgres://postgres@{pg_host}:{pg_port}/{pg_db_name}"
)),
faucet_address: None,
epoch_duration_ms,
use_indexer_experimental_methods,
config_dir,
graphql_address: Some(format!("{}:{}", graphql_host, graphql_port)),
use_indexer_v2,
})
.await?;

0 comments on commit 6afebbe

Please sign in to comment.