Skip to content

Commit 753707c

Browse files
committed
wip: test
And again it doesn't f_ing work locally
1 parent 0509e2a commit 753707c

File tree

4 files changed

+116
-50
lines changed

4 files changed

+116
-50
lines changed

bridge/svix-bridge-plugin-kafka/src/output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct KafkaProducer {
1515
}
1616

1717
impl KafkaProducer {
18-
pub(crate) fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
18+
pub fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
1919
let topic = opts.topic.clone();
2020
let producer = opts.create_producer()?;
2121

@@ -36,7 +36,7 @@ impl ReceiverOutput for KafkaProducer {
3636
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
3737
self.producer
3838
.send(
39-
FutureRecord::<(), _>::to(&self.topic)
39+
FutureRecord::<(), _>::to(dbg!(&self.topic))
4040
.payload(&serde_json::to_vec(&request.payload)?),
4141
Timeout::Never,
4242
)

bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
use std::time::Duration;
66

77
use rdkafka::{
8-
admin::{AdminClient, NewTopic, TopicReplication},
9-
client::DefaultClientContext,
108
producer::{FutureProducer, FutureRecord},
11-
types::RDKafkaErrorCode,
129
util::Timeout,
1310
ClientConfig,
1411
};
@@ -25,6 +22,8 @@ use wiremock::{
2522
Mock, MockServer, ResponseTemplate,
2623
};
2724

25+
use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};
26+
2827
#[ctor::ctor]
2928
fn test_setup() {
3029
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -44,10 +43,8 @@ fn test_setup() {
4443

4544
/// Time to wait for the plugin to connect.
4645
const CONNECT_WAIT_TIME: Duration = Duration::from_secs(10);
47-
/// Teimt to wait for the plugin to receive a message sent by a test.
46+
/// Time to wait for the plugin to receive a message sent by a test.
4847
const CONSUME_WAIT_TIME: Duration = Duration::from_secs(1);
49-
/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange...
50-
const BROKER_HOST: &str = "localhost:9094";
5148

5249
fn get_test_plugin(
5350
svix_url: String,
@@ -87,14 +84,6 @@ fn kafka_producer() -> FutureProducer {
8784
.unwrap()
8885
}
8986

90-
fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
91-
// create does block I/O, but we don't care in tests
92-
ClientConfig::new()
93-
.set("bootstrap.servers", BROKER_HOST)
94-
.create()
95-
.unwrap()
96-
}
97-
9887
async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
9988
info!(topic, "publishing message");
10089
producer
@@ -106,40 +95,6 @@ async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
10695
.unwrap();
10796
}
10897

109-
async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
110-
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
111-
if let Err(e) = admin_client
112-
.create_topics(&[new_topic], &Default::default())
113-
.await
114-
{
115-
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
116-
panic!("{e}");
117-
}
118-
}
119-
}
120-
121-
async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
122-
admin_client
123-
.delete_topics(&[topic], &Default::default())
124-
.await
125-
.unwrap();
126-
}
127-
128-
macro_rules! unique_topic_name {
129-
() => {
130-
&format!(
131-
"test_{}_{}",
132-
file!()
133-
.split('/')
134-
.next_back()
135-
.unwrap()
136-
.strip_suffix(".rs")
137-
.unwrap(),
138-
line!()
139-
)
140-
};
141-
}
142-
14398
/// Push a msg on the queue.
14499
/// Check to see if the svix server sees a request.
145100
#[tokio::test]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use std::time::Duration;
2+
3+
use rdkafka::{
4+
consumer::{Consumer, StreamConsumer},
5+
ClientConfig, Message,
6+
};
7+
use serde_json::json;
8+
use svix_bridge_plugin_kafka::{KafkaOutputOpts, KafkaProducer};
9+
use svix_bridge_types::{ForwardRequest, ReceiverOutput as _};
10+
11+
use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};
12+
13+
/// Time to wait for the consumer to be properly subscriber.
14+
const SUBSCRIBER_WAIT_TIME: Duration = Duration::from_secs(10);
15+
16+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
17+
async fn test_produce_ok() {
18+
let topic = unique_topic_name!();
19+
let admin_client = kafka_admin_client();
20+
create_topic(&admin_client, topic).await;
21+
tokio::time::sleep(SUBSCRIBER_WAIT_TIME).await;
22+
23+
let consumer: StreamConsumer = ClientConfig::new()
24+
.set("bootstrap.servers", BROKER_HOST)
25+
.set("group.id", "svix_bridge_test_group_id")
26+
.create()
27+
.unwrap();
28+
29+
consumer.subscribe(&[dbg!(topic)]).unwrap();
30+
tokio::time::sleep(SUBSCRIBER_WAIT_TIME).await;
31+
dbg!();
32+
33+
let producer = KafkaProducer::new(
34+
"test".into(),
35+
KafkaOutputOpts {
36+
bootstrap_brokers: BROKER_HOST.to_owned(),
37+
topic: topic.to_owned(),
38+
security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext,
39+
debug_contexts: None,
40+
},
41+
)
42+
.unwrap();
43+
44+
let payload = json!({ "test": "payload" });
45+
let payload_s = payload.to_string();
46+
producer.handle(ForwardRequest { payload }).await.unwrap();
47+
dbg!();
48+
49+
let msg = consumer.recv().await.unwrap();
50+
dbg!();
51+
assert_eq!(msg.payload(), Some(payload_s.as_bytes()));
52+
53+
tokio::time::timeout(Duration::from_secs(1), consumer.recv())
54+
.await
55+
.expect_err("there must be no further messages");
56+
57+
delete_topic(&admin_client, topic).await;
58+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,54 @@
1+
use rdkafka::{
2+
admin::{AdminClient, NewTopic, TopicReplication},
3+
client::DefaultClientContext,
4+
types::RDKafkaErrorCode,
5+
ClientConfig,
6+
};
7+
8+
/// These tests assume a "vanilla" kafka instance, using the default port, creds, exchange...
9+
const BROKER_HOST: &str = "localhost:9094";
10+
11+
fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
12+
// create does block I/O, but we don't care in tests
13+
ClientConfig::new()
14+
.set("bootstrap.servers", BROKER_HOST)
15+
.create()
16+
.unwrap()
17+
}
18+
19+
async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
20+
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
21+
if let Err(e) = admin_client
22+
.create_topics(&[new_topic], &Default::default())
23+
.await
24+
{
25+
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
26+
panic!("{e}");
27+
}
28+
}
29+
}
30+
31+
async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
32+
admin_client
33+
.delete_topics(&[topic], &Default::default())
34+
.await
35+
.unwrap();
36+
}
37+
38+
macro_rules! unique_topic_name {
39+
() => {
40+
&format!(
41+
"test_{}_{}",
42+
file!()
43+
.split('/')
44+
.next_back()
45+
.unwrap()
46+
.strip_suffix(".rs")
47+
.unwrap(),
48+
line!()
49+
)
50+
};
51+
}
52+
153
mod kafka_consumer;
54+
mod kafka_producer;

0 commit comments

Comments
 (0)