Skip to content

Commit

Permalink
[forge] indexer SuccessCriteria and new test (#14851)
Browse files Browse the repository at this point in the history
* [forge] indexer SuccessCriteria and new test

* [forge] optional metrics sample includes indexer
  • Loading branch information
rustielin authored Oct 8, 2024
1 parent 8bb94b7 commit ebc1b64
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 15 deletions.
104 changes: 104 additions & 0 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ fn get_test_suite(
return Ok(test_suite);
} else if let Some(test_suite) = get_dag_test(test_name, duration, test_cmd) {
return Ok(test_suite);
} else if let Some(test_suite) = get_indexer_test(test_name) {
return Ok(test_suite);
}

// Otherwise, check the test name against the ungrouped test suites
Expand Down Expand Up @@ -691,6 +693,15 @@ fn get_land_blocking_test(
Some(test)
}

/// Attempts to match the test name to an indexer test
fn get_indexer_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
"indexer_test" => indexer_test(),
_ => return None, // The test name does not match an indexer test
};
Some(test)
}

/// Attempts to match the test name to a network benchmark test
fn get_netbench_test(test_name: &str) -> Option<ForgeConfig> {
let test = match test_name {
Expand Down Expand Up @@ -2367,6 +2378,99 @@ fn multiregion_benchmark_test() -> ForgeConfig {
)
}

/// Workload sweep with multiple stressful workloads for indexer
fn indexer_test() -> ForgeConfig {
// Define all the workloads and their corresponding success criteria upfront
// The TransactionTypeArg is the workload per phase
// The structure of the success criteria is generally (min_tps, latencies...). See below for the exact definition.
let workloads_and_criteria = vec![
(
TransactionWorkload::new(TransactionTypeArg::CoinTransfer, 20000),
(7000, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::NoOp, 20000).with_num_modules(100),
(8500, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::ModifyGlobalResource, 6000)
.with_transactions_per_account(1),
(2000, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::TokenV2AmbassadorMint, 20000)
.with_unique_senders(),
(3200, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::PublishPackage, 200)
.with_transactions_per_account(1),
(28, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::VectorPicture30k, 100),
(100, 1.0, 1.0, 1.0),
),
(
TransactionWorkload::new(TransactionTypeArg::SmartTablePicture30KWith200Change, 100),
(100, 1.0, 1.0, 1.0),
),
(
TransactionWorkload::new(
TransactionTypeArg::TokenV1NFTMintAndTransferSequential,
1000,
),
(500, 0.5, 0.5, 0.5),
),
(
TransactionWorkload::new(TransactionTypeArg::TokenV1FTMintAndTransfer, 1000),
(500, 0.5, 0.5, 0.5),
),
];
let num_sweep = workloads_and_criteria.len();

let workloads = Workloads::TRANSACTIONS(
workloads_and_criteria
.iter()
.map(|(w, _)| w.clone())
.collect(),
);
let criteria = workloads_and_criteria
.iter()
.map(|(_, c)| {
let (
min_tps,
indexer_fullnode_processed_batch,
indexer_cache_worker_processed_batch,
indexer_data_service_all_chunks_sent,
) = c.to_owned();
SuccessCriteria::new(min_tps).add_latency_breakdown_threshold(
LatencyBreakdownThreshold::new_strict(vec![
(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
indexer_fullnode_processed_batch,
),
(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
indexer_cache_worker_processed_batch,
),
(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
indexer_data_service_all_chunks_sent,
),
]),
)
})
.collect::<Vec<_>>();

realistic_env_sweep_wrap(4, 4, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads,
criteria,
background_traffic: background_traffic_for_sweep(num_sweep),
})
}

/// This test runs a constant-TPS benchmark where the network includes
/// PFNs, and the transactions are submitted to the PFNs. This is useful
/// for measuring latencies when the system is not saturated.
Expand Down
1 change: 1 addition & 0 deletions testsuite/forge/src/backend/k8s/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl Factory for K8sFactory {
self.keep,
new_era,
self.use_port_forward,
self.enable_indexer,
)
.await
.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct K8sSwarm {
era: Option<String>,
use_port_forward: bool,
chaos_experiment_ops: Box<dyn ChaosExperimentOps + Send + Sync>,
has_indexer: bool,
}

impl K8sSwarm {
Expand All @@ -75,6 +76,7 @@ impl K8sSwarm {
keep: bool,
era: Option<String>,
use_port_forward: bool,
has_indexer: bool,
) -> Result<Self> {
let kube_client = create_k8s_client().await?;

Expand Down Expand Up @@ -123,6 +125,7 @@ impl K8sSwarm {
kube_client: kube_client.clone(),
kube_namespace: kube_namespace.to_string(),
}),
has_indexer,
};

// test hitting the configured prometheus endpoint
Expand Down Expand Up @@ -446,6 +449,10 @@ impl Swarm for K8sSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
get_default_pfn_node_config()
}

fn has_indexer(&self) -> bool {
self.has_indexer
}
}

/// Amount of time to wait for genesis to complete
Expand Down
4 changes: 4 additions & 0 deletions testsuite/forge/src/backend/local/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,10 @@ impl Swarm for LocalSwarm {
fn get_default_pfn_node_config(&self) -> NodeConfig {
todo!()
}

fn has_indexer(&self) -> bool {
false
}
}

#[derive(Debug)]
Expand Down
74 changes: 70 additions & 4 deletions testsuite/forge/src/interface/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl fmt::Debug for MetricSamples {
}
}

impl Default for MetricSamples {
fn default() -> Self {
Self::new(vec![])
}
}

#[derive(Clone, Debug)]
pub struct SystemMetrics {
pub cpu_core_metrics: MetricSamples,
Expand Down Expand Up @@ -105,6 +111,11 @@ pub enum LatencyBreakdownSlice {
ConsensusProposalToOrdered,
ConsensusOrderedToCommit,
ConsensusProposalToCommit,
// each of the indexer grpc steps in order
IndexerFullnodeProcessedBatch,
IndexerCacheWorkerProcessedBatch,
IndexerDataServiceAllChunksSent,
// TODO: add processor insertion into DB latency
}

#[derive(Clone, Debug)]
Expand All @@ -119,10 +130,16 @@ impl LatencyBreakdown {
self.0.keys().cloned().collect()
}

pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> &MetricSamples {
self.0
.get(slice)
.unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice))
pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> Option<&MetricSamples> {
self.0.get(slice)
}

pub fn join(&self, other: &LatencyBreakdown) -> LatencyBreakdown {
let mut ret_latency = self.0.clone();
for (slice, samples) in other.0.iter() {
ret_latency.insert(slice.clone(), samples.clone());
}
LatencyBreakdown::new(ret_latency)
}
}

Expand Down Expand Up @@ -210,5 +227,54 @@ pub async fn fetch_latency_breakdown(
MetricSamples::new(consensus_proposal_to_commit_samples),
);

if swarm.has_indexer() {
// These counters are defined in ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs
let indexer_fullnode_processed_batch_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="indexer_fullnode"})"#;
let indexer_cache_worker_processed_batch_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="cache_worker"})"#;
let indexer_data_service_all_chunks_sent_query =
r#"max(indexer_grpc_duration_in_secs{step="4", service_type="data_service"})"#;

let indexer_fullnode_processed_batch_samples = swarm
.query_range_metrics(
indexer_fullnode_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_cache_worker_processed_batch_samples = swarm
.query_range_metrics(
indexer_cache_worker_processed_batch_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

let indexer_data_service_all_chunks_sent_samples = swarm
.query_range_metrics(
indexer_data_service_all_chunks_sent_query,
start_time as i64,
end_time as i64,
None,
)
.await?;

samples.insert(
LatencyBreakdownSlice::IndexerFullnodeProcessedBatch,
MetricSamples::new(indexer_fullnode_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch,
MetricSamples::new(indexer_cache_worker_processed_batch_samples),
);
samples.insert(
LatencyBreakdownSlice::IndexerDataServiceAllChunksSent,
MetricSamples::new(indexer_data_service_all_chunks_sent_samples),
);
}
Ok(LatencyBreakdown::new(samples))
}
4 changes: 4 additions & 0 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub trait Swarm: Sync + Send {
}

fn get_default_pfn_node_config(&self) -> NodeConfig;

/// Check if the swarm has an indexer. NOTE: in the future we should make this more rich, and include
/// indexer endpoints, similar to how we collect validator and fullnode endpoints.
fn has_indexer(&self) -> bool;
}

impl<T: ?Sized> SwarmExt for T where T: Swarm {}
Expand Down
4 changes: 3 additions & 1 deletion testsuite/forge/src/success_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ impl LatencyBreakdownThreshold {
traffic_name_addition: &String,
) -> anyhow::Result<()> {
for (slice, threshold) in &self.thresholds {
let samples = metrics.get_samples(slice);
let samples = metrics
.get_samples(slice)
.expect("Could not get metric samples");
threshold.ensure_metrics_threshold(
&format!("{:?}{}", slice, traffic_name_addition),
samples.get(),
Expand Down
5 changes: 4 additions & 1 deletion testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ impl NetworkTest for dyn NetworkLoadTest {
.keys()
.into_iter()
.map(|slice| {
let slice_samples = phase_stats.latency_breakdown.get_samples(&slice);
let slice_samples = phase_stats
.latency_breakdown
.get_samples(&slice)
.expect("Could not get samples");
format!(
"{:?}: max: {:.3}, avg: {:.3}",
slice,
Expand Down
26 changes: 17 additions & 9 deletions testsuite/testcases/src/load_vs_perf_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
use aptos_forge::{
args::TransactionTypeArg,
emitter::NumAccountsMode,
prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice},
prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice, MetricSamples},
success_criteria::{SuccessCriteria, SuccessCriteriaChecker},
EmitJob, EmitJobMode, EmitJobRequest, NetworkContext, NetworkContextSynchronizer, NetworkTest,
Result, Test, TxnStats, WorkflowProgress,
Expand Down Expand Up @@ -471,7 +471,7 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {

let mut table = Vec::new();
table.push(format!(
"{: <name_width$} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
"{: <name_width$} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
type_name,
"submitted/s",
"committed/s",
Expand All @@ -486,14 +486,18 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {
"pos->prop",
"prop->order",
"order->commit",
"actual dur"
"actual dur",
// optional indexer metrics
"idx_fn",
"idx_cache",
"idx_data",
));

for run_results in results {
for result in run_results {
let rate = result.stats.rate();
table.push(format!(
"{: <name_width$} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12}",
"{: <name_width$} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.2} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12} | {: <12.3} | {: <12.3} | {: <12.3}",
result.name,
rate.submitted,
rate.committed,
Expand All @@ -504,11 +508,15 @@ fn to_table(type_name: String, results: &[Vec<SingleRunStats>]) -> Vec<String> {
rate.p50_latency as f64 / 1000.0,
rate.p90_latency as f64 / 1000.0,
rate.p99_latency as f64 / 1000.0,
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(),
result.actual_duration.as_secs()
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).unwrap_or(&MetricSamples::default()).max_sample(),
result.actual_duration.as_secs(),
// optional indexer metrics
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerFullnodeProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerCacheWorkerProcessedBatch).unwrap_or(&MetricSamples::default()).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::IndexerDataServiceAllChunksSent).unwrap_or(&MetricSamples::default()).max_sample(),
));
}
}
Expand Down

0 comments on commit ebc1b64

Please sign in to comment.