Skip to content

Commit e544dc2

Browse files
committed
Implementing callback support and consumer_update response
1 parent 13faec7 commit e544dc2

File tree

9 files changed

+106
-37
lines changed

9 files changed

+106
-37
lines changed

protocol/src/commands/consumer_update.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ impl ConsumerUpdateCommand {
2727
active,
2828
}
2929
}
30+
31+
pub fn get_correlation_id(&self) -> u32 {
32+
self.correlation_id
33+
}
34+
35+
pub fn is_active(&self) -> u8 {
36+
self.active
37+
}
3038
}
3139

3240
impl Encoder for ConsumerUpdateCommand {

protocol/src/request/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,4 @@ mod tests {
398398
request_encode_decode_test::<SuperStreamRouteRequest>()
399399
}
400400

401-
#[test]
402-
fn request_consumer_update_request_command() {
403-
request_encode_decode_test::<ConsumerUpdateRequestCommand>()
404-
}
405401
}

src/client/dispatcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ where
168168
match result {
169169
Ok(item) => match item.correlation_id() {
170170
Some(correlation_id) => match item.kind_ref() {
171-
ResponseKind::ConsumerUpdate(consumer_update) => state.notify(item).await,
171+
ResponseKind::ConsumerUpdate(_) => state.notify(item).await,
172172
_ => state.dispatch(correlation_id, item).await,
173173
},
174174
None => state.notify(item).await,

src/client/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use options::ClientOptions;
4242
use rabbitmq_stream_protocol::{
4343
commands::{
4444
close::{CloseRequest, CloseResponse},
45+
consumer_update_request::ConsumerUpdateRequestCommand,
4546
create_stream::CreateStreamCommand,
4647
create_super_stream::CreateSuperStreamCommand,
4748
credit::CreditCommand,
@@ -851,4 +852,15 @@ impl Client {
851852

852853
Ok(config)
853854
}
855+
856+
pub async fn consumer_update(
857+
&self,
858+
correlation_id: u32,
859+
offset_specification: OffsetSpecification,
860+
) -> RabbitMQStreamResult<GenericResponse> {
861+
self.send_and_receive(|_| {
862+
ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification)
863+
})
864+
.await
865+
}
854866
}

src/consumer.rs

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use rabbitmq_stream_protocol::{
1515
commands::subscribe::OffsetSpecification, message::Message, ResponseKind,
1616
};
1717

18+
use core::option::Option::None;
19+
1820
use tokio::sync::mpsc::{channel, Receiver, Sender};
1921
use tracing::trace;
2022

@@ -26,13 +28,13 @@ use crate::{
2628
Client, ClientOptions, Environment, MetricsCollector,
2729
};
2830
use futures::{task::AtomicWaker, Stream};
29-
use rabbitmq_stream_protocol::commands::consumer_update::ConsumerUpdateCommand;
3031
use rand::rngs::StdRng;
3132
use rand::{seq::SliceRandom, SeedableRng};
3233

3334
type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
3435

35-
type ConsumerUpdateListener = Option<Arc<dyn Fn(bool, &MessageContext) -> u64 + Send + Sync>>;
36+
pub type ConsumerUpdateListener =
37+
Arc<dyn Fn(u8, &MessageContext) -> OffsetSpecification + Send + Sync>;
3638

3739
/// API for consuming RabbitMQ stream messages
3840
pub struct Consumer {
@@ -43,6 +45,7 @@ pub struct Consumer {
4345
}
4446

4547
struct ConsumerInternal {
48+
name: Option<String>,
4649
client: Client,
4750
stream: String,
4851
offset_specification: OffsetSpecification,
@@ -52,6 +55,7 @@ struct ConsumerInternal {
5255
waker: AtomicWaker,
5356
metrics_collector: Arc<dyn MetricsCollector>,
5457
filter_configuration: Option<FilterConfiguration>,
58+
consumer_update_listener: Option<ConsumerUpdateListener>,
5559
}
5660

5761
impl ConsumerInternal {
@@ -86,22 +90,17 @@ impl FilterConfiguration {
8690
}
8791

8892
pub struct MessageContext {
89-
consumer: Consumer,
90-
subscriber_name: String,
91-
reference: String,
93+
consumer_name: Option<String>,
94+
stream: String,
9295
}
9396

9497
impl MessageContext {
95-
pub fn get_consumer(self) -> Consumer {
96-
self.consumer
98+
pub fn get_name(self) -> Option<String> {
99+
self.consumer_name
97100
}
98101

99-
pub fn get_subscriber_name(self) -> String {
100-
self.subscriber_name
101-
}
102-
103-
pub fn get_reference(self) -> String {
104-
self.reference
102+
pub fn get_stream(self) -> String {
103+
self.stream
105104
}
106105
}
107106

@@ -111,6 +110,7 @@ pub struct ConsumerBuilder {
111110
pub(crate) environment: Environment,
112111
pub(crate) offset_specification: OffsetSpecification,
113112
pub(crate) filter_configuration: Option<FilterConfiguration>,
113+
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
114114
pub(crate) client_provided_name: String,
115115
pub(crate) properties: HashMap<String, String>,
116116
}
@@ -172,6 +172,7 @@ impl ConsumerBuilder {
172172
let subscription_id = 1;
173173
let (tx, rx) = channel(10000);
174174
let consumer = Arc::new(ConsumerInternal {
175+
name: self.consumer_name.clone(),
175176
subscription_id,
176177
stream: stream.to_string(),
177178
client: client.clone(),
@@ -181,6 +182,7 @@ impl ConsumerBuilder {
181182
waker: AtomicWaker::new(),
182183
metrics_collector: collector,
183184
filter_configuration: self.filter_configuration.clone(),
185+
consumer_update_listener: None,
184186
});
185187
let msg_handler = ConsumerMessageHandler(consumer.clone());
186188
client.set_handler(msg_handler).await;
@@ -213,7 +215,7 @@ impl ConsumerBuilder {
213215

214216
if response.is_ok() {
215217
Ok(Consumer {
216-
name: self.consumer_name,
218+
name: self.consumer_name.clone(),
217219
receiver: rx,
218220
internal: consumer,
219221
})
@@ -245,6 +247,26 @@ impl ConsumerBuilder {
245247
self
246248
}
247249

250+
pub fn consumer_update(
251+
mut self,
252+
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
253+
+ Send
254+
+ Sync
255+
+ 'static,
256+
) -> Self {
257+
let f = Arc::new(consumer_update_listener);
258+
self.consumer_update_listener = Some(f);
259+
self
260+
}
261+
262+
pub fn consumer_update_arc(
263+
mut self,
264+
consumer_update_listener: Option<crate::consumer::ConsumerUpdateListener>,
265+
) -> Self {
266+
self.consumer_update_listener = consumer_update_listener;
267+
self
268+
}
269+
248270
pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
249271
self.properties = properties;
250272
self
@@ -386,8 +408,38 @@ impl MessageHandler for ConsumerMessageHandler {
386408
// TODO handle credit fail
387409
let _ = self.0.client.credit(self.0.subscription_id, 1).await;
388410
self.0.metrics_collector.consume(len as u64).await;
389-
} else {
390-
println!("other message arrived");
411+
} else if let ResponseKind::ConsumerUpdate(consumer_update) = response.kind_ref() {
412+
println!("Consumer update arrived");
413+
414+
if self.0.consumer_update_listener.is_none() {
415+
let offset_specification = OffsetSpecification::Next;
416+
let _ = self
417+
.0
418+
.client
419+
.consumer_update(
420+
consumer_update.get_correlation_id(),
421+
offset_specification,
422+
)
423+
.await;
424+
} else {
425+
let is_active = consumer_update.is_active();
426+
let message_context = MessageContext {
427+
consumer_name: self.0.name.clone(),
428+
stream: self.0.stream.clone(),
429+
};
430+
let consumer_update_listener_callback =
431+
self.0.consumer_update_listener.clone().unwrap();
432+
let offset_specification =
433+
consumer_update_listener_callback(is_active, &message_context);
434+
let _ = self
435+
.0
436+
.client
437+
.consumer_update(
438+
consumer_update.get_correlation_id(),
439+
offset_specification,
440+
)
441+
.await;
442+
}
391443
}
392444
}
393445
Some(Err(err)) => {

src/environment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl Environment {
7474
environment: self.clone(),
7575
offset_specification: OffsetSpecification::Next,
7676
filter_configuration: None,
77+
consumer_update_listener: None,
7778
client_provided_name: String::from("rust-stream-consumer"),
7879
properties: HashMap::new(),
7980
}
@@ -84,6 +85,7 @@ impl Environment {
8485
environment: self.clone(),
8586
offset_specification: OffsetSpecification::Next,
8687
filter_configuration: None,
88+
consumer_update_listener: None,
8789
client_provided_name: String::from("rust-super-stream-consumer"),
8890
properties: HashMap::new(),
8991
}

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,16 @@ pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;
8787

8888
pub use crate::client::{Client, ClientOptions, MetricsCollector};
8989

90-
pub use crate::consumer::{Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration};
90+
pub use crate::consumer::{
91+
Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration, MessageContext,
92+
};
9193
pub use crate::environment::{Environment, EnvironmentBuilder, TlsConfiguration};
9294
pub use crate::producer::{Dedup, NoDedup, Producer, ProducerBuilder};
9395
pub mod types {
9496

9597
pub use crate::byte_capacity::ByteCapacity;
9698
pub use crate::client::{Broker, MessageResult, StreamMetadata};
97-
pub use crate::consumer::Delivery;
99+
pub use crate::consumer::{Delivery, MessageContext};
98100
pub use crate::offset_specification::OffsetSpecification;
99101
pub use crate::stream_creator::StreamCreator;
100102
pub use crate::superstream::HashRoutingMurmurStrategy;

src/superstream_consumer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::client::Client;
2-
use crate::consumer::Delivery;
2+
use crate::consumer::{ConsumerUpdateListener, Delivery};
33
use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
44
use crate::superstream::DefaultSuperStreamMetadata;
55
use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration};
@@ -33,6 +33,7 @@ pub struct SuperStreamConsumerBuilder {
3333
pub(crate) environment: Environment,
3434
pub(crate) offset_specification: OffsetSpecification,
3535
pub(crate) filter_configuration: Option<FilterConfiguration>,
36+
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
3637
pub(crate) client_provided_name: String,
3738
pub(crate) properties: HashMap<String, String>,
3839
}
@@ -64,6 +65,7 @@ impl SuperStreamConsumerBuilder {
6465
.offset(self.offset_specification.clone())
6566
.client_provided_name(self.client_provided_name.as_str())
6667
.filter_input(self.filter_configuration.clone())
68+
.consumer_update_arc(self.consumer_update_listener.clone())
6769
.properties(self.properties.clone())
6870
.build(partition.as_str())
6971
.await
@@ -101,6 +103,14 @@ impl SuperStreamConsumerBuilder {
101103
self
102104
}
103105

106+
pub fn consumer_update(
107+
mut self,
108+
consumer_update_listener: Option<ConsumerUpdateListener>,
109+
) -> Self {
110+
self.consumer_update_listener = consumer_update_listener;
111+
self
112+
}
113+
104114
pub fn client_provided_name(mut self, name: &str) -> Self {
105115
self.client_provided_name = String::from(name);
106116
self

tests/integration/client_test.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -457,16 +457,3 @@ async fn client_test_route_test() {
457457
test.partitions.get(0).unwrap()
458458
);
459459
}
460-
461-
#[tokio::test(flavor = "multi_thread")]
462-
async fn client_consumer_update_request_test() {
463-
let test = TestClient::create().await;
464-
465-
let response = test
466-
.client
467-
.consumer_update(OffsetSpecification::Next)
468-
.await
469-
.unwrap();
470-
471-
assert_eq!(&ResponseCode::Ok, response.code());
472-
}

0 commit comments

Comments
 (0)