Skip to content

Commit

Permalink
flatten config options
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Nov 17, 2022
1 parent a8de610 commit b5acebd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 68 deletions.
6 changes: 3 additions & 3 deletions examples/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ async fn main() -> Result<(), pulsar::Error> {
.with_name("my-producer2".to_string())
.with_options(producer::ProducerOptions {
batch_size: Some(4),
// compression: Some(compression::Compression::Lz4(compression::CompressionLz4::default())),
// compression: Some(compression::Compression::Zlib(compression::CompressionZlib::default())),
// compression: Some(compression::Compression::Zstd(compression::CompressionZstd::default())),
// compression: Some(compression::Compression::Lz4 { mode: lz4::block::CompressionMode::DEFAULT }),
// compression: Some(compression::Compression::Zlib { level: flate2::Compression::default() }),
// compression: Some(compression::Compression::Zstd { level: zstd::DEFAULT_COMPRESSION_LEVEL }),
compression: Some(compression::Compression::Snappy),
..Default::default()
})
Expand Down
87 changes: 32 additions & 55 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,43 @@
use lz4::block::CompressionMode;

/// Wrapper of supported compression algorithms
#[derive(Clone, Debug)]
#[derive(Default, Debug)]
pub enum Compression {
#[default]
None,
Lz4(CompressionLz4),
Zlib(CompressionZlib),
Zstd(CompressionZstd),
/// Options of the [lz4](https://lz4.github.io/lz4/) algorithm
Lz4 {
/// compression mode of lz4 to be used
mode: CompressionMode
},
/// Options of the [zlib](https://www.zlib.net/) algorithm
Zlib {
/// compression level of zlib to be used (0-9)
level: flate2::Compression,
},
/// Options of the [zstd](http://facebook.github.io/zstd/zstd_manual.html) algorithm
Zstd {
/// compression level of zstd to be used ([`zstd::compression_level_range()`])
level: i32,
},
/// Options of the [snappy](http://google.github.io/snappy/) algorithm
Snappy,
}

/// Options of the [lz4](https://lz4.github.io/lz4/) algorithm
#[derive(Debug)]
pub struct CompressionLz4 {
/// compression mode of lz4 to be used
pub mode: CompressionMode,
}

/// Options of the [zlib](https://www.zlib.net/) algorithm
#[derive(Default, Clone, Copy, Debug)]
pub struct CompressionZlib {
/// compression level of zlib to be used (0-9)
pub level: flate2::Compression,
}

/// Options of the [zstd](http://facebook.github.io/zstd/zstd_manual.html) algorithm
#[derive(Clone, Copy, Debug)]
pub struct CompressionZstd {
/// compression level of zstd to be used ([`zstd::compression_level_range()`])
pub level: i32,
}

impl Default for Compression {
fn default() -> Self {
Compression::None
}
}

impl Default for CompressionLz4 {
fn default() -> Self {
CompressionLz4 {
mode: CompressionMode::DEFAULT,
}
}
}

impl Default for CompressionZstd {
fn default() -> Self {
CompressionZstd {
level: zstd::DEFAULT_COMPRESSION_LEVEL,
}
}
}

impl Clone for CompressionLz4 {
impl Clone for Compression {
fn clone(&self) -> Self {
CompressionLz4 {
mode: match self.mode {
CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(i),
CompressionMode::FAST(i) => CompressionMode::FAST(i),
CompressionMode::DEFAULT => CompressionMode::DEFAULT,
}
match self {
Compression::None => Compression::None,
Compression::Lz4 { mode } => Compression::Lz4 {
mode: match mode {
CompressionMode::HIGHCOMPRESSION(i) => CompressionMode::HIGHCOMPRESSION(*i),
CompressionMode::FAST(i) => CompressionMode::FAST(*i),
CompressionMode::DEFAULT => CompressionMode::DEFAULT,
}
},
Compression::Zlib { level } => Compression::Zlib { level: level.clone() },
Compression::Zstd { level } => Compression::Zstd { level: level.clone() },
Compression::Snappy => Compression::Snappy
}
}
}
}
19 changes: 9 additions & 10 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ impl<Exe: Executor> TopicProducer<Exe> {

match compression {
None | Some(Compression::None) => {}
Some(Compression::Lz4(..)) => {
Some(Compression::Lz4 { .. }) => {
#[cfg(not(feature = "lz4"))]
return Err(Error::Custom("cannot create a producer with LZ4 compression because the 'lz4' cargo feature is not active".to_string()));
}
Some(Compression::Zlib(..)) => {
Some(Compression::Zlib { .. }) => {
#[cfg(not(feature = "flate2"))]
return Err(Error::Custom("cannot create a producer with zlib compression because the 'flate2' cargo feature is not active".to_string()));
}
Some(Compression::Zstd(..)) => {
Some(Compression::Zstd { .. }) => {
#[cfg(not(feature = "zstd"))]
return Err(Error::Custom("cannot create a producer with zstd compression because the 'zstd' cargo feature is not active".to_string()));
}
Expand Down Expand Up @@ -728,14 +728,14 @@ impl<Exe: Executor> TopicProducer<Exe> {
) -> Result<proto::CommandSendReceipt, Error> {
let compressed_message = match self.compression.clone() {
None | Some(Compression::None) => message,
Some(Compression::Lz4(compression)) => {
Some(Compression::Lz4 { mode }) => {
#[cfg(not(feature = "lz4"))]
return unimplemented!();

#[cfg(feature = "lz4")]
{
let compressed_payload: Vec<u8> =
lz4::block::compress(&message.payload[..], Some(compression.mode), false)
lz4::block::compress(&message.payload[..], Some(mode), false)
.map_err(ProducerError::Io)?;

message.uncompressed_size = Some(message.payload.len() as u32);
Expand All @@ -744,14 +744,13 @@ impl<Exe: Executor> TopicProducer<Exe> {
message
}
}
Some(Compression::Zlib(compression)) => {
Some(Compression::Zlib { level}) => {
#[cfg(not(feature = "flate2"))]
return unimplemented!();

#[cfg(feature = "flate2")]
{
let mut e =
flate2::write::ZlibEncoder::new(Vec::new(), compression.level);
let mut e = flate2::write::ZlibEncoder::new(Vec::new(), level);
e.write_all(&message.payload[..])
.map_err(ProducerError::Io)?;
let compressed_payload = e.finish().map_err(ProducerError::Io)?;
Expand All @@ -762,14 +761,14 @@ impl<Exe: Executor> TopicProducer<Exe> {
message
}
}
Some(Compression::Zstd(compression)) => {
Some(Compression::Zstd { level }) => {
#[cfg(not(feature = "zstd"))]
return unimplemented!();

#[cfg(feature = "zstd")]
{
let compressed_payload =
zstd::encode_all(&message.payload[..], compression.level).map_err(ProducerError::Io)?;
zstd::encode_all(&message.payload[..], level).map_err(ProducerError::Io)?;
message.uncompressed_size = Some(message.payload.len() as u32);
message.payload = compressed_payload;
message.compression = Some(3);
Expand Down

0 comments on commit b5acebd

Please sign in to comment.