Skip to content

Commit 89df059

Browse files
authored
bridge: Add kafka receiver output (#1345)
… that is, support converting incoming webhooks to Kafka messages. Closes svix/monorepo-private#8508.
2 parents aae42f9 + 9287a8d commit 89df059

File tree

7 files changed

+252
-70
lines changed

7 files changed

+252
-70
lines changed

bridge/svix-bridge-plugin-kafka/src/config.rs

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
use rdkafka::{consumer::StreamConsumer, error::KafkaResult, ClientConfig};
1+
use rdkafka::{
2+
consumer::StreamConsumer, error::KafkaResult, producer::FutureProducer, ClientConfig,
3+
};
24
use serde::Deserialize;
3-
use svix_bridge_types::{SenderInput, SenderOutputOpts, TransformationConfig};
5+
use svix_bridge_types::{ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig};
46

5-
use crate::{input::KafkaConsumer, Result};
7+
use crate::{input::KafkaConsumer, KafkaProducer, Result};
68

79
#[derive(Clone, Deserialize)]
810
pub struct KafkaInputOpts {
@@ -40,25 +42,45 @@ impl KafkaInputOpts {
4042
// messages are committed manually after webhook delivery was successful.
4143
.set("enable.auto.commit", "false");
4244

43-
match self.security_protocol {
44-
KafkaSecurityProtocol::Plaintext => {
45-
config.set("security.protocol", "plaintext");
46-
}
47-
KafkaSecurityProtocol::Ssl => {
48-
config.set("security.protocol", "ssl");
49-
}
50-
KafkaSecurityProtocol::SaslSsl {
51-
sasl_username,
52-
sasl_password,
53-
} => {
54-
config
55-
.set("security.protocol", "sasl_ssl")
56-
.set("sasl.mechanisms", "SCRAM-SHA-512")
57-
.set("sasl.username", sasl_username)
58-
.set("sasl.password", sasl_password);
45+
self.security_protocol.apply(&mut config);
46+
if let Some(debug_contexts) = self.debug_contexts {
47+
if !debug_contexts.is_empty() {
48+
config.set("debug", debug_contexts);
5949
}
6050
}
6151

52+
config.create()
53+
}
54+
}
55+
56+
#[derive(Clone, Deserialize)]
57+
pub struct KafkaOutputOpts {
58+
/// Comma-separated list of addresses.
59+
///
60+
/// Example: `localhost:9094`
61+
#[serde(rename = "kafka_bootstrap_brokers")]
62+
pub bootstrap_brokers: String,
63+
64+
/// The topic to listen to.
65+
#[serde(rename = "kafka_topic")]
66+
pub topic: String,
67+
68+
/// The value for 'security.protocol' in the kafka config.
69+
#[serde(flatten)]
70+
pub security_protocol: KafkaSecurityProtocol,
71+
72+
/// The 'debug' config value for rdkafka - enables more verbose logging
73+
/// for the selected 'contexts'
74+
#[serde(rename = "kafka_debug_contexts")]
75+
pub debug_contexts: Option<String>,
76+
}
77+
78+
impl KafkaOutputOpts {
79+
pub(crate) fn create_producer(self) -> KafkaResult<FutureProducer> {
80+
let mut config = ClientConfig::new();
81+
config.set("bootstrap.servers", self.bootstrap_brokers);
82+
83+
self.security_protocol.apply(&mut config);
6284
if let Some(debug_contexts) = self.debug_contexts {
6385
if !debug_contexts.is_empty() {
6486
config.set("debug", debug_contexts);
@@ -82,6 +104,29 @@ pub enum KafkaSecurityProtocol {
82104
},
83105
}
84106

107+
impl KafkaSecurityProtocol {
108+
fn apply(self, config: &mut ClientConfig) {
109+
match self {
110+
KafkaSecurityProtocol::Plaintext => {
111+
config.set("security.protocol", "plaintext");
112+
}
113+
KafkaSecurityProtocol::Ssl => {
114+
config.set("security.protocol", "ssl");
115+
}
116+
KafkaSecurityProtocol::SaslSsl {
117+
sasl_username,
118+
sasl_password,
119+
} => {
120+
config
121+
.set("security.protocol", "sasl_ssl")
122+
.set("sasl.mechanisms", "SCRAM-SHA-512")
123+
.set("sasl.username", sasl_username)
124+
.set("sasl.password", sasl_password);
125+
}
126+
}
127+
}
128+
}
129+
85130
pub fn into_sender_input(
86131
name: String,
87132
opts: KafkaInputOpts,
@@ -95,3 +140,10 @@ pub fn into_sender_input(
95140
output,
96141
)?))
97142
}
143+
144+
pub fn into_receiver_output(
145+
name: String,
146+
opts: KafkaOutputOpts,
147+
) -> Result<Box<dyn ReceiverOutput>> {
148+
Ok(Box::new(KafkaProducer::new(name, opts)?))
149+
}
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
mod config;
22
mod error;
33
mod input;
4+
mod output;
45

56
pub use self::{
6-
config::{into_sender_input, KafkaInputOpts, KafkaSecurityProtocol},
7+
config::{
8+
into_receiver_output, into_sender_input, KafkaInputOpts, KafkaOutputOpts,
9+
KafkaSecurityProtocol,
10+
},
711
error::{Error, Result},
812
input::KafkaConsumer,
13+
output::KafkaProducer,
914
};
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use rdkafka::{
2+
error::KafkaError,
3+
producer::{FutureProducer, FutureRecord},
4+
util::Timeout,
5+
};
6+
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};
7+
8+
use crate::config::KafkaOutputOpts;
9+
10+
/// Forwards webhook payloads to kafka.
11+
pub struct KafkaProducer {
12+
name: String,
13+
topic: String,
14+
producer: FutureProducer,
15+
}
16+
17+
impl KafkaProducer {
18+
pub fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
19+
let topic = opts.topic.clone();
20+
let producer = opts.create_producer()?;
21+
22+
Ok(Self {
23+
name,
24+
topic,
25+
producer,
26+
})
27+
}
28+
}
29+
30+
#[async_trait]
31+
impl ReceiverOutput for KafkaProducer {
32+
fn name(&self) -> &str {
33+
&self.name
34+
}
35+
36+
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
37+
self.producer
38+
.send(
39+
FutureRecord::<(), _>::to(&self.topic)
40+
.payload(&serde_json::to_vec(&request.payload)?),
41+
Timeout::Never,
42+
)
43+
.await
44+
.map_err(|(e, _msg)| e)?;
45+
46+
Ok(())
47+
}
48+
}

bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
use std::time::Duration;
66

77
use rdkafka::{
8-
admin::{AdminClient, NewTopic, TopicReplication},
9-
client::DefaultClientContext,
108
producer::{FutureProducer, FutureRecord},
11-
types::RDKafkaErrorCode,
129
util::Timeout,
1310
ClientConfig,
1411
};
@@ -25,6 +22,8 @@ use wiremock::{
2522
Mock, MockServer, ResponseTemplate,
2623
};
2724

25+
use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};
26+
2827
#[ctor::ctor]
2928
fn test_setup() {
3029
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -44,10 +43,8 @@ fn test_setup() {
4443

4544
/// Time to wait for the plugin to connect.
4645
const CONNECT_WAIT_TIME: Duration = Duration::from_secs(10);
47-
/// Teimt to wait for the plugin to receive a message sent by a test.
46+
/// Time to wait for the plugin to receive a message sent by a test.
4847
const CONSUME_WAIT_TIME: Duration = Duration::from_secs(1);
49-
/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange...
50-
const BROKER_HOST: &str = "localhost:9094";
5148

5249
fn get_test_plugin(
5350
svix_url: String,
@@ -87,14 +84,6 @@ fn kafka_producer() -> FutureProducer {
8784
.unwrap()
8885
}
8986

90-
fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
91-
// create does block I/O, but we don't care in tests
92-
ClientConfig::new()
93-
.set("bootstrap.servers", BROKER_HOST)
94-
.create()
95-
.unwrap()
96-
}
97-
9887
async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
9988
info!(topic, "publishing message");
10089
producer
@@ -106,40 +95,6 @@ async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
10695
.unwrap();
10796
}
10897

109-
async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
110-
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
111-
if let Err(e) = admin_client
112-
.create_topics(&[new_topic], &Default::default())
113-
.await
114-
{
115-
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
116-
panic!("{e}");
117-
}
118-
}
119-
}
120-
121-
async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
122-
admin_client
123-
.delete_topics(&[topic], &Default::default())
124-
.await
125-
.unwrap();
126-
}
127-
128-
macro_rules! unique_topic_name {
129-
() => {
130-
&format!(
131-
"test_{}_{}",
132-
file!()
133-
.split('/')
134-
.next_back()
135-
.unwrap()
136-
.strip_suffix(".rs")
137-
.unwrap(),
138-
line!()
139-
)
140-
};
141-
}
142-
14398
/// Push a msg on the queue.
14499
/// Check to see if the svix server sees a request.
145100
#[tokio::test]
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use rdkafka::{
4+
consumer::{Consumer, StreamConsumer},
5+
ClientConfig, Message,
6+
};
7+
use serde_json::json;
8+
use svix_bridge_plugin_kafka::{KafkaOutputOpts, KafkaProducer};
9+
use svix_bridge_types::{ForwardRequest, ReceiverOutput as _};
10+
11+
use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};
12+
13+
/// Time to wait for the consumer to be properly listening.
14+
const LISTEN_WAIT_TIME: Duration = Duration::from_secs(5);
15+
16+
#[tokio::test]
17+
async fn test_produce_ok() {
18+
let topic = unique_topic_name!();
19+
let admin_client = kafka_admin_client();
20+
create_topic(&admin_client, topic).await;
21+
22+
// Start listening for messages
23+
let consumer: StreamConsumer = ClientConfig::new()
24+
.set("bootstrap.servers", BROKER_HOST)
25+
.set("group.id", "svix_bridge_test_group_id")
26+
.create()
27+
.unwrap();
28+
29+
consumer.subscribe(&[topic]).unwrap();
30+
31+
let consumer = Arc::new(consumer);
32+
let recv_join_hdl = tokio::spawn({
33+
let consumer = consumer.clone();
34+
async move { consumer.recv().await.unwrap().detach() }
35+
});
36+
tokio::time::sleep(LISTEN_WAIT_TIME).await;
37+
38+
let payload = json!({ "test": "payload" });
39+
let payload_s = payload.to_string();
40+
41+
// Only then actually send a message
42+
let producer = KafkaProducer::new(
43+
"test".into(),
44+
KafkaOutputOpts {
45+
bootstrap_brokers: BROKER_HOST.to_owned(),
46+
topic: topic.to_owned(),
47+
security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext,
48+
debug_contexts: None,
49+
},
50+
)
51+
.unwrap();
52+
producer.handle(ForwardRequest { payload }).await.unwrap();
53+
54+
// Assert that the message is received
55+
let msg = recv_join_hdl.await.unwrap();
56+
assert_eq!(msg.payload(), Some(payload_s.as_bytes()));
57+
58+
// Assert that no further messages are received in the next second
59+
tokio::time::timeout(Duration::from_secs(1), consumer.recv())
60+
.await
61+
.expect_err("there must be no further messages");
62+
63+
delete_topic(&admin_client, topic).await;
64+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,54 @@
1+
use rdkafka::{
2+
admin::{AdminClient, NewTopic, TopicReplication},
3+
client::DefaultClientContext,
4+
types::RDKafkaErrorCode,
5+
ClientConfig,
6+
};
7+
8+
/// These tests assume a "vanilla" kafka instance, using the default port, creds, exchange...
9+
const BROKER_HOST: &str = "localhost:9094";
10+
11+
fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
12+
// create does block I/O, but we don't care in tests
13+
ClientConfig::new()
14+
.set("bootstrap.servers", BROKER_HOST)
15+
.create()
16+
.unwrap()
17+
}
18+
19+
async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
20+
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
21+
if let Err(e) = admin_client
22+
.create_topics(&[new_topic], &Default::default())
23+
.await
24+
{
25+
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
26+
panic!("{e}");
27+
}
28+
}
29+
}
30+
31+
async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
32+
admin_client
33+
.delete_topics(&[topic], &Default::default())
34+
.await
35+
.unwrap();
36+
}
37+
38+
macro_rules! unique_topic_name {
39+
() => {
40+
&format!(
41+
"test_{}_{}",
42+
file!()
43+
.split('/')
44+
.next_back()
45+
.unwrap()
46+
.strip_suffix(".rs")
47+
.unwrap(),
48+
line!()
49+
)
50+
};
51+
}
52+
153
mod kafka_consumer;
54+
mod kafka_producer;

0 commit comments

Comments
 (0)