Skip to content

Commit 2d26551

Browse files
add ability to include properties to the consumers during subscriptions (#249)
1 parent 028f7e9 commit 2d26551

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

src/consumer.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ pub struct ConsumerBuilder {
8989
pub(crate) offset_specification: OffsetSpecification,
9090
pub(crate) filter_configuration: Option<FilterConfiguration>,
9191
pub(crate) client_provided_name: String,
92+
pub(crate) properties: HashMap<String, String>,
9293
}
9394

9495
impl ConsumerBuilder {
95-
pub async fn build(self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
96+
pub async fn build(mut self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
9697
// Connect to the user specified node first, then look for a random replica to connect to instead.
9798
// This is recommended for load balancing purposes
9899

@@ -161,18 +162,17 @@ impl ConsumerBuilder {
161162
let msg_handler = ConsumerMessageHandler(consumer.clone());
162163
client.set_handler(msg_handler).await;
163164

164-
let mut properties = HashMap::new();
165165
if let Some(filter_input) = self.filter_configuration {
166166
if !client.filtering_supported() {
167167
return Err(ConsumerCreateError::FilteringNotSupport);
168168
}
169169
for (index, item) in filter_input.filter_values.iter().enumerate() {
170170
let key = format!("filter.{}", index);
171-
properties.insert(key, item.to_owned());
171+
self.properties.insert(key, item.to_owned());
172172
}
173173

174174
let match_unfiltered_key = "match-unfiltered".to_string();
175-
properties.insert(
175+
self.properties.insert(
176176
match_unfiltered_key,
177177
filter_input.match_unfiltered.to_string(),
178178
);
@@ -184,7 +184,7 @@ impl ConsumerBuilder {
184184
stream,
185185
self.offset_specification,
186186
1,
187-
properties,
187+
self.properties.clone(),
188188
)
189189
.await?;
190190

@@ -221,6 +221,11 @@ impl ConsumerBuilder {
221221
self.filter_configuration = filter_configuration;
222222
self
223223
}
224+
225+
pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
226+
self.properties = properties;
227+
self
228+
}
224229
}
225230

226231
impl Consumer {

src/environment.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44

55
use crate::producer::NoDedup;
66
use crate::types::OffsetSpecification;
7+
use std::collections::HashMap;
78

89
use crate::{
910
client::{Client, ClientOptions, MetricsCollector},
@@ -74,6 +75,7 @@ impl Environment {
7475
offset_specification: OffsetSpecification::Next,
7576
filter_configuration: None,
7677
client_provided_name: String::from("rust-stream-consumer"),
78+
properties: HashMap::new(),
7779
}
7880
}
7981

@@ -83,6 +85,7 @@ impl Environment {
8385
offset_specification: OffsetSpecification::Next,
8486
filter_configuration: None,
8587
client_provided_name: String::from("rust-super-stream-consumer"),
88+
properties: HashMap::new(),
8689
}
8790
}
8891

src/superstream_consumer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct SuperStreamConsumerBuilder {
3434
pub(crate) offset_specification: OffsetSpecification,
3535
pub(crate) filter_configuration: Option<FilterConfiguration>,
3636
pub(crate) client_provided_name: String,
37+
pub(crate) properties: HashMap<String, String>,
3738
}
3839

3940
impl SuperStreamConsumerBuilder {
@@ -63,6 +64,7 @@ impl SuperStreamConsumerBuilder {
6364
.offset(self.offset_specification.clone())
6465
.client_provided_name(self.client_provided_name.as_str())
6566
.filter_input(self.filter_configuration.clone())
67+
.properties(self.properties.clone())
6668
.build(partition.as_str())
6769
.await
6870
.unwrap();

0 commit comments

Comments
 (0)