Skip to content

Implement super_stream #232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3a73539
WIP: implementing create and delete superstream commands
DanielePalaia Oct 4, 2024
2687306
implementing route and partition commands
DanielePalaia Oct 8, 2024
4e18ce5
refactoring tests
DanielePalaia Oct 9, 2024
bbd5cb7
super_stream implementation
DanielePalaia Oct 9, 2024
f194288
superstream producer implementation
DanielePalaia Oct 10, 2024
8213cd9
super_stream_producer first basic version
DanielePalaia Oct 11, 2024
93e83e4
finalizing superstream_producer and tests
DanielePalaia Oct 14, 2024
76a55f2
better implementation/refactoring
DanielePalaia Oct 15, 2024
d00d3d1
making Messages as references to allow borrowing
DanielePalaia Oct 15, 2024
b72fb5a
implementing super_stream consumer
DanielePalaia Oct 16, 2024
cad06b1
some refactoring
DanielePalaia Oct 17, 2024
e88d542
fixing super_stream_consumer test
DanielePalaia Oct 19, 2024
7418439
fix clippy issue
DanielePalaia Oct 19, 2024
7d165ca
adding, stream method to consumer delivery struct and adding examples
DanielePalaia Oct 20, 2024
e81e13c
fixing offset in SuperstreamConsumer
DanielePalaia Oct 21, 2024
5406df7
super_stream_consumer new approach
DanielePalaia Oct 22, 2024
6f606ca
implementing close()
DanielePalaia Oct 23, 2024
ba8b31b
implement filtering on super_stream
DanielePalaia Oct 23, 2024
8c8fa9c
adding super_stream_filtering tests
DanielePalaia Oct 24, 2024
ebb3a0b
error handling improvement in super_stream send
DanielePalaia Oct 25, 2024
4df6e67
improve super_stream send example
DanielePalaia Oct 25, 2024
364febf
add information to the example
Gsantomaggio Oct 28, 2024
9440ea5
small performance improvement in HashRoutingMurmurStrategy
DanielePalaia Oct 27, 2024
2be72b0
add information to the example
Gsantomaggio Oct 28, 2024
e8f7c12
close super stream consumer
Gsantomaggio Oct 28, 2024
e06146d
fix bug in super_stream_producer client created twice
DanielePalaia Oct 29, 2024
3196f1c
merging
DanielePalaia Oct 30, 2024
2238a20
updating README.md
DanielePalaia Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0"
async-trait = "0.1.51"
rand = "0.8"
dashmap = "5.3.4"
murmur3 = "0.5.2"

[dev-dependencies]
tracing-subscriber = "0.3.1"
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ sleep(Duration::from_secs(10)).await;
handle.close().await?;
```

### Superstreams

The client supports the superstream functionality.

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the [blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/) for more info.

You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams.

Have a look to the examples to see on how to work with super streams.

See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/examples/send_super_stream.rs)

See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/examples/receive_super_stream.rs)

### Development

#### Compiling
Expand Down
63 changes: 63 additions & 0 deletions examples/receive_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use futures::StreamExt;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let message_count = 100_000;
let super_stream = "hello-rust-super-stream";

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
println!(
"Super stream consumer example, consuming messages from the super stream {}",
super_stream
);
let mut super_stream_consumer: SuperStreamConsumer = environment
.super_stream_consumer()
.offset(OffsetSpecification::First)
.build(super_stream)
.await
.unwrap();

for _ in 0..message_count {
let delivery = super_stream_consumer.next().await.unwrap();
{
let delivery = delivery.unwrap();
println!(
"Got message: {:#?} from stream: {} with offset: {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec()).unwrap())
.unwrap(),
delivery.stream(),
delivery.offset()
);
}
}

println!("Stopping super stream consumer...");
let _ = super_stream_consumer.handle().close().await;
println!("Super stream consumer stopped");
Ok(())
}
110 changes: 110 additions & 0 deletions examples/send_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

fn hash_strategy_value_extractor(message: &Message) -> String {
message
.application_properties()
.unwrap()
.get("id")
.unwrap()
.clone()
.try_into()
.unwrap()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let message_count = 100_000;
let super_stream = "hello-rust-super-stream";
let confirmed_messages = Arc::new(AtomicU32::new(0));
let notify_on_send = Arc::new(Notify::new());
let _ = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

let delete_stream = environment.delete_super_stream(super_stream).await;

match delete_stream {
Ok(_) => {
println!("Successfully deleted super stream {}", super_stream);
}
Err(err) => {
println!(
"Failed to delete super stream {}. error {}",
super_stream, err
);
}
}

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
println!(
"Super stream example. Sending {} messages to the super stream: {}",
message_count, super_stream
);
let mut super_stream_producer = environment
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
HashRoutingMurmurStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.build(super_stream)
.await
.unwrap();

for i in 0..message_count {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
let msg = Message::builder()
.body(format!("super stream message_{}", i))
.application_properties()
.insert("id", i.to_string())
.message_builder()
.build();
super_stream_producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
inner_notifier.notify_one();
}
}
})
.await
.unwrap();
}

notify_on_send.notified().await;
println!(
"Successfully sent {} messages to the super stream {}",
message_count, super_stream
);
let _ = super_stream_producer.close().await;
Ok(())
}
11 changes: 11 additions & 0 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@
impl ConsumerHandle {
/// Close the [`Consumer`] associated to this handle
pub async fn close(self) -> Result<(), ConsumerCloseError> {
self.internal_close().await
}

pub(crate) async fn internal_close(&self) -> Result<(), ConsumerCloseError> {
match self.0.closed.compare_exchange(false, true, SeqCst, SeqCst) {
Ok(false) => {
let response = self.0.client.unsubscribe(self.0.subscription_id).await?;
Expand Down Expand Up @@ -327,6 +331,7 @@
.0
.sender
.send(Ok(Delivery {
stream: self.0.stream.clone(),
subscription_id: self.0.subscription_id,
message,
offset,
Expand Down Expand Up @@ -355,6 +360,7 @@
/// Envelope from incoming message
#[derive(Debug)]
pub struct Delivery {
stream: String,
subscription_id: u8,
message: Message,
offset: u64,
Expand All @@ -366,6 +372,11 @@
self.subscription_id
}

/// Get a reference to the delivery's stream name.
pub fn stream(&self) -> &String {
&self.stream
}

Check warning on line 378 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L376-L378

Added lines #L376 - L378 were not covered by tests

/// Get a reference to the delivery's message.
pub fn message(&self) -> &Message {
&self.message
Expand Down
24 changes: 24 additions & 0 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use crate::{
error::StreamDeleteError,
producer::ProducerBuilder,
stream_creator::StreamCreator,
superstream::RoutingStrategy,
superstream_consumer::SuperStreamConsumerBuilder,
superstream_producer::SuperStreamProducerBuilder,
RabbitMQStreamResult,
};

Expand Down Expand Up @@ -49,6 +52,18 @@ impl Environment {
}
}

pub fn super_stream_producer(
&self,
routing_strategy: RoutingStrategy,
) -> SuperStreamProducerBuilder<NoDedup> {
SuperStreamProducerBuilder {
environment: self.clone(),
data: PhantomData,
filter_value_extractor: None,
route_strategy: routing_strategy,
}
}

/// Returns a builder for creating a consumer
pub fn consumer(&self) -> ConsumerBuilder {
ConsumerBuilder {
Expand All @@ -58,6 +73,15 @@ impl Environment {
filter_configuration: None,
}
}

pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
SuperStreamConsumerBuilder {
environment: self.clone(),
offset_specification: OffsetSpecification::Next,
filter_configuration: None,
}
}

pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
Client::connect(self.options.client_options.clone()).await
}
Expand Down
20 changes: 20 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@
#[error(transparent)]
Client(#[from] ClientError),
}

#[derive(Error, Debug)]
pub enum SuperStreamProducerPublishError {
#[error("Failed to send message to stream")]
ProducerPublishError(),
#[error("Failed to create a producer")]
ProducerCreateError(),
}

impl From<ProducerPublishError> for SuperStreamProducerPublishError {
fn from(_err: ProducerPublishError) -> Self {
SuperStreamProducerPublishError::ProducerPublishError()
}

Check warning on line 125 in src/error.rs

View check run for this annotation

Codecov / codecov/patch

src/error.rs#L123-L125

Added lines #L123 - L125 were not covered by tests
}
impl From<ProducerCreateError> for SuperStreamProducerPublishError {
fn from(_err: ProducerCreateError) -> Self {
SuperStreamProducerPublishError::ProducerCreateError()
}

Check warning on line 130 in src/error.rs

View check run for this annotation

Codecov / codecov/patch

src/error.rs#L128-L130

Added lines #L128 - L130 were not covered by tests
}

#[derive(Error, Debug)]
pub enum ProducerCloseError {
#[error("Failed to close producer for stream {stream} status {status:?}")]
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub mod error;
mod offset_specification;
mod producer;
mod stream_creator;
mod superstream;
mod superstream_consumer;
mod superstream_producer;

pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;

Expand All @@ -94,6 +97,10 @@ pub mod types {
pub use crate::consumer::Delivery;
pub use crate::offset_specification::OffsetSpecification;
pub use crate::stream_creator::StreamCreator;
pub use crate::superstream::HashRoutingMurmurStrategy;
pub use crate::superstream::RoutingKeyRoutingStrategy;
pub use crate::superstream::RoutingStrategy;
pub use crate::superstream_consumer::SuperStreamConsumer;
pub use rabbitmq_stream_protocol::message::Message;
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};

Expand Down
8 changes: 8 additions & 0 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ impl<T> ProducerBuilder<T> {
self.filter_value_extractor = Some(f);
self
}

pub fn filter_value_extractor_arc(
mut self,
filter_value_extractor: Option<FilterValueExtractor>,
) -> Self {
self.filter_value_extractor = filter_value_extractor;
self
}
}

pub struct MessageAccumulator {
Expand Down
Loading
Loading