Skip to content

Commit

Permalink
feat: Add consumer parameters to nats source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
gbto committed Apr 24, 2024
1 parent 5b134a6 commit bf4d794
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 41 deletions.
81 changes: 76 additions & 5 deletions crates/arroyo-connectors/src/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,80 @@ impl NatsConnector {
pull_opt("stream", options).ok(),
pull_opt("subject", options).ok(),
) {
(Some(stream), None) => Some(SourceType::Stream(stream)),
(None, Some(subject)) => Some(SourceType::Subject(subject)),
(Some(stream), None) => Some(SourceType::Jetstream {
stream,
description: options.remove("consumer.description"),
ack_policy: options
.remove("consumer.ack_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| AcknowledgmentPolicy::Explicit),
replay_policy: options.remove("consumer.replay_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| ReplayPolicy::Instant),
ack_wait: options
.remove("consumer.ack_wait")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 30),
filter_subjects: options
.remove("consumer.filter_subjects")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| "".to_string()),
sample_frequency: options
.remove("consumer.sample_frequency")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 0),
num_replicas: options
.remove("consumer.num_replicas")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1),
inactive_threshold: options
.remove("consumer.inactive_threshold")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 600),
rate_limit: options
.remove("consumer.rate_limit")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
max_ack_pending: options
.remove("consumer.max_ack_pending")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
max_deliver: options
.remove("consumer.max_deliver")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
max_waiting: options
.remove("consumer.max_waiting")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1000000),
max_batch: options
.remove("consumer.max_batch")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 10000),
max_bytes: options
.remove("consumer.max_bytes")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
max_expires: options
.remove("consumer.max_expires")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 300000),
}),
(None, Some(subject)) => Some(SourceType::Core { subject }),
(Some(_), Some(_)) => bail!("Exactly one of `stream` or `subject` must be set"),
(None, None) => bail!("One of `stream` or `subject` must be set"),
};
Expand All @@ -92,7 +164,6 @@ impl NatsConnector {
// TODO: Use parameters with `nats.` prefix for the NATS connection configuration
Ok(NatsTable {
connector_type: nats_table_type,
client_configs: HashMap::new(),
})
}
}
Expand Down Expand Up @@ -182,8 +253,8 @@ impl Connector for NatsConnector {
.as_ref()
.ok_or_else(|| anyhow!("sourceType is required"))?
{
SourceType::Subject(s) => s,
SourceType::Stream(s) => s,
SourceType::Jetstream { stream, .. } => stream,
SourceType::Core { subject, .. } => subject,
}
}
ConnectorType::Sink { sink_type, .. } => {
Expand Down
98 changes: 71 additions & 27 deletions crates/arroyo-connectors/src/nats/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::AcknowledgmentPolicy;
use super::ConnectorType;
use super::NatsConfig;
use super::NatsState;
use super::NatsTable;
use super::ReplayPolicy;
use super::{get_nats_client, SourceType};
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
Expand Down Expand Up @@ -42,11 +44,11 @@ impl SourceOperator for NatsSourceFunc {
format!(
"nats-source-{}",
match &self.source_type {
SourceType::Subject(s) => {
s
SourceType::Jetstream { stream, .. } => {
stream
}
SourceType::Stream(s) => {
s
SourceType::Core { subject, .. } => {
subject
}
}
)
Expand Down Expand Up @@ -198,28 +200,68 @@ impl NatsSourceFunc {
let consumer_name = format!(
"{}-{}",
match &self.source_type {
SourceType::Subject(s) => {
s
SourceType::Jetstream { stream, .. } => {
stream
}
SourceType::Stream(s) => {
s
SourceType::Core { subject, .. } => {
subject
}
},
&ctx.task_info.operator_id.replace("operator_", "")
);

// TODO: Generate this `consumer_config` via a function that parses
// all optional parameters passed in the `client_configs` of the `table.json`
// and merges with the default values
let consumer_config = consumer::pull::Config {
name: Some(consumer_name.clone()),
replay_policy: consumer::ReplayPolicy::Instant,
inactive_threshold: Duration::from_secs(3600),
ack_policy: consumer::AckPolicy::Explicit,
ack_wait: Duration::from_secs(120),
num_replicas: 1,
deliver_policy,
..Default::default()
let consumer_config = match &self.source_type {
SourceType::Jetstream {
ack_policy,
replay_policy,
ack_wait,
description,
filter_subjects,
rate_limit,
sample_frequency,
num_replicas,
inactive_threshold,
max_ack_pending,
max_deliver,
max_waiting,
max_batch,
max_bytes,
max_expires,
..
} => consumer::pull::Config {
name: Some(consumer_name.clone()),
ack_policy: match ack_policy {
AcknowledgmentPolicy::Explicit => consumer::AckPolicy::Explicit,
AcknowledgmentPolicy::All => consumer::AckPolicy::All,
AcknowledgmentPolicy::None => consumer::AckPolicy::None,
},
replay_policy: match replay_policy {
ReplayPolicy::Original => consumer::ReplayPolicy::Original,
ReplayPolicy::Instant => consumer::ReplayPolicy::Instant,
},
ack_wait: Duration::from_secs(ack_wait.clone() as u64),
description: description.clone(),
filter_subjects: filter_subjects
.clone()
.split(",")
.map(|s| s.to_string())
.collect(),
rate_limit: rate_limit.clone() as u64,
sample_frequency: sample_frequency.clone() as u8,
num_replicas: num_replicas.clone() as usize,
inactive_threshold: Duration::from_secs(inactive_threshold.clone() as u64),
max_ack_pending: max_ack_pending.clone(),
max_deliver: max_deliver.clone(),
max_waiting: max_waiting.clone(),
max_batch: max_batch.clone(),
max_bytes: max_bytes.clone(),
max_expires: Duration::from_secs(max_expires.clone() as u64),
deliver_policy,
..Default::default()
},
_ => {
panic!("Core source type not supported for NATS consumer")
}
};

match stream.delete_consumer(&consumer_name).await {
Expand Down Expand Up @@ -304,12 +346,14 @@ impl NatsSourceFunc {
.expect("Failed instantiating NATS client");

match self.source_type.clone() {
SourceType::Stream(s) => {
SourceType::Jetstream { stream, .. } => {
let start_sequence = self
.get_start_sequence_number(ctx)
.await
.expect("Failed to get start sequence number");
let nats_stream = &self.get_nats_stream(nats_client.clone(), s.clone()).await;
let nats_stream = &self
.get_nats_stream(nats_client.clone(), stream.clone())
.await;
let mut messages = self
.create_nats_consumer(nats_stream, start_sequence, ctx)
.await
Expand Down Expand Up @@ -367,7 +411,7 @@ impl NatsSourceFunc {
sequence_numbers.insert(
ctx.task_info.operator_id.clone(),
NatsState {
stream_name: s.clone(),
stream_name: stream.clone(),
stream_sequence_number: message_info.stream_sequence.clone()
}
);
Expand All @@ -387,7 +431,7 @@ impl NatsSourceFunc {
},
None => {
break
info!("Finished reading message from {}", s.clone());
info!("Finished reading message from {}", stream.clone());
},
}
}
Expand Down Expand Up @@ -442,9 +486,9 @@ impl NatsSourceFunc {
}
Ok(SourceFinishType::Graceful)
}
SourceType::Subject(s) => {
SourceType::Core { subject, .. } => {
let mut messages = nats_client
.subscribe(s.clone())
.subscribe(subject.clone())
.await
.expect("Failed subscribing to NATS subject");
loop {
Expand All @@ -461,7 +505,7 @@ impl NatsSourceFunc {
},
None => {
break
info!("Finished reading message from {}", s.clone());
info!("Finished reading message from {}", subject.clone());
},
}
}
Expand Down
Loading

0 comments on commit bf4d794

Please sign in to comment.