Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ err_trail = { version = "0.11.0", features = ["tracing"] }
error_set = "0.9.1"
figlet-rs = "0.1.5"
figment = { version = "0.10.19", features = ["toml", "env"] }
flate2 = "1.1.5"
flume = "0.12.0"
futures = "0.3.31"
futures-util = "0.3.31"
Expand All @@ -143,6 +144,7 @@ integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
lazy_static = "1.5.0"
log = "0.4.29"
lz4_flex = "0.12.0"
mimalloc = "0.1"
mockall = "0.14.0"
nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
Expand Down Expand Up @@ -176,6 +178,7 @@ serde_yaml_ng = "0.10.0"
serial_test = "3.2.0"
server = { path = "core/server" }
simd-json = { version = "0.17.0", features = ["serde_impl"] }
snap = "1.1.1"
strum = { version = "0.27.2", features = ["derive"] }
strum_macros = "0.27.2"
sysinfo = "0.37.2"
Expand Down Expand Up @@ -211,6 +214,7 @@ uuid = { version = "1.19.0", features = [
] }
webpki-roots = "1.0.4"
zip = "7.0.0"
zstd = "0.13.3"

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ logos-codegen: 0.15.1, "Apache-2.0 OR MIT",
logos-derive: 0.15.1, "Apache-2.0 OR MIT",
lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
lz4_flex: 0.11.5, "MIT",
lz4_flex: 0.12.0, "MIT",
lzma-rust2: 0.15.4, "Apache-2.0",
macro_rules_attribute: 0.1.3, "MIT",
macro_rules_attribute-proc_macro: 0.1.3, "MIT",
Expand Down
1 change: 0 additions & 1 deletion core/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ human-repr = { workspace = true }
iggy = { workspace = true }
integration = { workspace = true }
nonzero_lit = { workspace = true }
rand = { workspace = true }
rayon = "1.11.0"
serde = { workspace = true }
sysinfo = { workspace = true }
Expand Down
9 changes: 8 additions & 1 deletion core/bench/report/src/plotting/text/subtext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,15 @@ impl BenchmarkReport {
};
let streams = format!("{} Streams", self.params.streams);

let compression = if let Some(algorithm) = &self.params.compression_algorithm {
let min_size = self.params.compression_min_size.unwrap_or(0);
format!("Compression: {} (min {} bytes)", algorithm, min_size)
} else {
"Compression: none".to_owned()
};

format!(
"{actors_info} • {streams} • {topics}{partitions} • {messages_per_batch} Msg/batch • {message_batches} Batches • {message_size} Bytes/msg • {user_data_print}",
"{actors_info} • {streams} • {topics}{partitions} • {compression} • {messages_per_batch} Msg/batch • {message_batches} Batches • {message_size} Bytes/msg • {user_data_print}",
)
}
}
Expand Down
8 changes: 7 additions & 1 deletion core/bench/report/src/prints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ impl BenchmarkReport {
let messages_per_batch = format!("{} messages per batch, ", self.params.messages_per_batch);
let message_batches = format!("{} message batches, ", self.params.message_batches);
let message_size = format!("{} bytes per message, ", self.params.message_size);
let compression = if let Some(algorithm) = &self.params.compression_algorithm {
let min_size = self.params.compression_min_size.unwrap_or(0);
format!("compression: {} (min {} bytes), ", algorithm, min_size)
} else {
"compression: none, ".to_owned()
};
let producers = if self.params.producers == 0 {
"".to_owned()
} else if self.params.benchmark_kind == BenchmarkKind::EndToEndProducingConsumerGroup
Expand All @@ -65,7 +71,7 @@ impl BenchmarkReport {
format!("{} consumer groups, ", self.params.consumer_groups)
};
println!();
let params_print = format!("Benchmark: {kind}, {producers}{consumers}{streams}{topics}{partitions}{consumer_groups}{total_messages}{messages_per_batch}{message_batches}{message_size}{total_size}\n",).blue();
let params_print = format!("Benchmark: {kind}, {producers}{consumers}{streams}{topics}{partitions}{consumer_groups}{compression}{total_messages}{messages_per_batch}{message_batches}{message_size}{total_size}\n",).blue();

info!("{}", params_print);

Expand Down
2 changes: 2 additions & 0 deletions core/bench/report/src/types/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct BenchmarkParams {
pub pretty_name: String,
pub bench_command: String,
pub params_identifier: String,
pub compression_algorithm: Option<String>,
pub compression_min_size: Option<u32>,
}

impl BenchmarkParams {
Expand Down
29 changes: 16 additions & 13 deletions core/bench/src/actors/producer/client/high_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,22 @@ impl BenchmarkInit for HighLevelProducerClient {
_ => Partitioning::balanced(),
};

self.producer = Some(
client
.producer(&stream_id_str, topic_id_str)?
.partitioning(partitioning)
.create_stream_if_not_exists()
.create_topic_if_not_exists(
self.config.partitions,
Some(1),
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.build(),
);
let mut builder = client
.producer(&stream_id_str, topic_id_str)?
.partitioning(partitioning)
.create_stream_if_not_exists()
.create_topic_if_not_exists(
self.config.partitions,
Some(1),
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
);

if let Some(compression_config) = self.config.compression_config {
builder = builder.compressor(compression_config);
}

self.producer = Some(builder.build());
self.producer.as_mut().unwrap().init().await?;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions core/bench/src/actors/producer/client/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct BenchmarkProducerConfig {
pub messages_per_batch: BenchmarkNumericParameter,
pub message_size: BenchmarkNumericParameter,
pub warmup_time: IggyDuration,
pub compression_config: Option<ClientCompressionConfig>,
}

pub trait ProducerClient: Send + Sync {
Expand Down
2 changes: 2 additions & 0 deletions core/bench/src/actors/producer/typed_benchmark_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl TypedBenchmarkProducer {
sampling_time: IggyDuration,
moving_average_window: u32,
limit_bytes_per_second: Option<IggyByteSize>,
compression_config: Option<ClientCompressionConfig>,
) -> Self {
let config = BenchmarkProducerConfig {
producer_id,
Expand All @@ -63,6 +64,7 @@ impl TypedBenchmarkProducer {
messages_per_batch,
message_size,
warmup_time,
compression_config,
};

if use_high_level_api {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl TypedBenchmarkProducingConsumer {
limit_bytes_per_second: Option<IggyByteSize>,
polling_kind: PollingKind,
origin_timestamp_latency_calculation: bool,
compression_config: Option<ClientCompressionConfig>,
) -> Self {
let producer_config = BenchmarkProducerConfig {
producer_id: actor_id,
Expand All @@ -72,6 +73,7 @@ impl TypedBenchmarkProducingConsumer {
messages_per_batch,
message_size,
warmup_time,
compression_config,
};

let consumer_config = BenchmarkConsumerConfig {
Expand Down
47 changes: 46 additions & 1 deletion core/bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::output::BenchmarkOutputCommand;
use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
use super::{
defaults::{
DEFAULT_COMPRESSION_ALGORITHM, DEFAULT_COMPRESSION_MIN_PAYLOAD_SIZE,
DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE, DEFAULT_MESSAGES_PER_BATCH,
DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_PERFORM_CLEANUP, DEFAULT_SAMPLING_TIME,
DEFAULT_SERVER_STDOUT_VISIBILITY, DEFAULT_SKIP_SERVER_START, DEFAULT_WARMUP_TIME,
Expand All @@ -31,7 +32,9 @@ use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::numeric_parameter::BenchmarkNumericParameter;
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol};
use iggy::prelude::{
ClientCompressionConfig, CompressionAlgorithm, IggyByteSize, IggyDuration, TransportProtocol,
};
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::path::Path;
Expand All @@ -58,6 +61,15 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'b', group = "data_to_process")]
pub message_batches: Option<NonZeroU32>,

/// Message compression algorithm (requires high-level API)
/// Available algorithms: gzip, lz4, zstd, snappy
#[arg(long, short = 'c', value_parser = CompressionAlgorithm::from_str, default_value_t = DEFAULT_COMPRESSION_ALGORITHM)]
pub compression_algorithm: CompressionAlgorithm,

/// Minimum payload size (bytes) for compression. Messages smaller than `compression_min_size` are not compressed.
#[arg(long, default_value_t = DEFAULT_COMPRESSION_MIN_PAYLOAD_SIZE)]
pub compression_min_size: NonZeroU32,

/// Total size of all messages to process in bytes (aggregate, for all actors).
/// This argument is mutually exclusive with `message_batches`.
#[arg(long, short = 'T', group = "data_to_process")]
Expand Down Expand Up @@ -186,6 +198,15 @@ impl IggyBenchArgs {
.exit();
}

if self.compression_algorithm != CompressionAlgorithm::None && !self.high_level_api {
Self::command()
.error(
ErrorKind::ArgumentConflict,
"Compression requires --high-level-api flag",
)
.exit();
}

self.benchmark_kind.inner().validate();
}

Expand All @@ -201,6 +222,17 @@ impl IggyBenchArgs {
self.message_size
}

pub fn compression_config(&self) -> Option<ClientCompressionConfig> {
if self.compression_algorithm == CompressionAlgorithm::None {
None
} else {
Some(ClientCompressionConfig {
algorithm: self.compression_algorithm,
min_size: self.compression_min_size.into(),
})
}
}

pub const fn total_data(&self) -> Option<IggyByteSize> {
self.total_data
}
Expand Down Expand Up @@ -396,6 +428,11 @@ impl IggyBenchArgs {
transport.to_string(),
];

if self.compression_algorithm != CompressionAlgorithm::None {
parts.push(self.compression_algorithm.to_string());
parts.push(self.compression_min_size.to_string());
}

if let Some(remark) = &self.remark() {
parts.push(remark.clone());
}
Expand Down Expand Up @@ -447,6 +484,14 @@ impl IggyBenchArgs {
self.messages_per_batch(),
);

if self.compression_algorithm != CompressionAlgorithm::None {
let compression_str = format!(
" compression algorithm {} and min. byte size to realize compression {}",
self.compression_algorithm, self.compression_min_size
);
name.push_str(&compression_str);
}

if let Some(remark) = &self.remark() {
name = format!("{name} ({remark})");
}
Expand Down
5 changes: 4 additions & 1 deletion core/bench/src/args/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
* under the License.
*/

use iggy::prelude::IggyByteSize;
use iggy::prelude::{CompressionAlgorithm, IggyByteSize};
use nonzero_lit::u32;
use std::num::NonZeroU32;

pub const DEFAULT_COMPRESSION_ALGORITHM: CompressionAlgorithm = CompressionAlgorithm::None;
pub const DEFAULT_COMPRESSION_MIN_PAYLOAD_SIZE: NonZeroU32 = u32!(2048);

pub const DEFAULT_HTTP_SERVER_ADDRESS: &str = "127.0.0.1:3000";

pub const DEFAULT_TCP_SERVER_ADDRESS: &str = "127.0.0.1:8090";
Expand Down
8 changes: 7 additions & 1 deletion core/bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ pub trait Benchmarkable: Send {
.map(|rl| format!(" global rate limit: {rl}/s"))
.unwrap_or_default();

format!("{message_size}{messages_per_batch}{data}{rate_limit}",)
let compression = format!(
" compression algorithm used: {} with min. bytes size: {}b ",
self.args().compression_algorithm,
self.args().compression_min_size
);

format!("{message_size}{messages_per_batch}{data}{compression}{rate_limit}",)
}
}
Loading