Skip to content

Commit

Permalink
bridge: Error refactoring (#1342)
Browse files Browse the repository at this point in the history
Preparation for adding kafka `ReceiverOutput`.

Part of svix/monorepo-private#8508.
  • Loading branch information
svix-jplatte authored Jun 20, 2024
2 parents 1568915 + 5c7c42a commit aae42f9
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 63 deletions.
2 changes: 2 additions & 0 deletions bridge/.config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = { period = "30s", terminate-after = 2 }
1 change: 1 addition & 0 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bridge/svix-bridge-plugin-kafka/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl SenderInput for KafkaConsumer {
self.transformer_tx = tx;
}

async fn run(&self) -> std::io::Result<()> {
async fn run(&self) {
let mut fails: u64 = 0;
let mut last_fail = Instant::now();

Expand Down
14 changes: 7 additions & 7 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn test_consume_ok() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn test_consume_transformed_json_ok() {
plugin.set_transformer(Some(transformer_tx));

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -330,7 +330,7 @@ async fn test_consume_transformed_string_ok() {
plugin.set_transformer(Some(transformer_tx));

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -365,7 +365,7 @@ async fn test_missing_app_id_nack() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});

// Wait for the consumer to connect
Expand Down Expand Up @@ -415,7 +415,7 @@ async fn test_missing_event_type_nack() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});

// Wait for the consumer to connect
Expand Down Expand Up @@ -467,7 +467,7 @@ async fn test_consume_svix_503() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -510,7 +510,7 @@ async fn test_consume_svix_offline() {
drop(mock_server);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down
1 change: 1 addition & 0 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
svix-bridge-types = { path = "../svix-bridge-types" }
thiserror = "1.0.61"
tokio = { version = "1", features = ["full"] }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
Expand Down
40 changes: 10 additions & 30 deletions bridge/svix-bridge-plugin-queue/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
pub use omniqueue::QueueError;
use svix_bridge_types::svix;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
Payload(String),
Json(serde_json::Error),
Queue(QueueError),
Svix(svix::error::Error),
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
#[error("queue error: {0}")]
Queue(#[from] QueueError),
#[error("svix API error: {0}")]
Svix(#[from] svix::error::Error),
#[error("{0}")]
Generic(String),
}
pub type Result<T> = std::result::Result<T, Error>;

impl From<svix::error::Error> for Error {
fn from(value: svix::error::Error) -> Self {
Error::Svix(value)
}
}

impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Error::Json(value)
}
}

impl From<QueueError> for Error {
fn from(value: QueueError) -> Self {
Error::Queue(value)
}
}

impl From<String> for Error {
fn from(value: String) -> Self {
Self::Generic(value)
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

impl From<Error> for std::io::Error {
fn from(value: Error) -> Self {
match value {
Error::Payload(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Json(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Queue(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Svix(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Expand Down
2 changes: 1 addition & 1 deletion bridge/svix-bridge-plugin-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait Consumer {
}
}

async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> std::io::Result<()> {
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> ! {
let mut fails: u64 = 0;
let mut last_fail = Instant::now();
let system_name = consumer.system();
Expand Down
12 changes: 5 additions & 7 deletions bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use omniqueue::DynProducer;
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};

use crate::{config::QueueOutputOpts, error::Result};

Expand Down Expand Up @@ -42,11 +42,9 @@ impl ReceiverOutput for QueueForwarder {
fn name(&self) -> &str {
&self.name
}
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
Ok(self
.sender
.send_serde_json(&request.payload)
.await
.map_err(crate::Error::from)?)

async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.sender.send_serde_json(&request.payload).await?;
Ok(())
}
}
4 changes: 3 additions & 1 deletion bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ impl SenderInput for QueueSender {
fn name(&self) -> &str {
&self.name
}

fn set_transformer(&mut self, tx: Option<TransformerTx>) {
self.transformer_tx = tx;
}
async fn run(&self) -> std::io::Result<()> {

async fn run(&self) {
run_inner(self).await
}
}
6 changes: 4 additions & 2 deletions bridge/svix-bridge-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,19 @@ pub trait SenderInput: Send {
/// For plugins that want to run JS transformations on payloads.
/// Giving them a sender lets them pass messages to the JS executor.
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
async fn run(&self) -> std::io::Result<()>;
async fn run(&self);
}

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Represents something we can hand a webhook payload to.
/// Aka a "forwarder."
///
/// To start, we're only using this in conjunction with an HTTP server "owned" by the bridge binary.
#[async_trait]
pub trait ReceiverOutput: Send + Sync {
fn name(&self) -> &str;
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()>;
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError>;
}

#[derive(Deserialize, Debug, Clone, Default)]
Expand Down
13 changes: 3 additions & 10 deletions bridge/svix-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,10 @@ async fn supervise_senders(inputs: Vec<Box<dyn SenderInput>>) -> Result<()> {
set.spawn(async move {
// FIXME: needs much better signaling for termination
loop {
let fut = input.run();
// If this future returns, the consumer terminated unexpectedly.
if let Err(e) = fut.await {
tracing::warn!(
"sender input {} unexpectedly terminated: {}",
input.name(),
e
);
} else {
tracing::warn!("sender input {} unexpectedly terminated", input.name());
}
input.run().await;

tracing::warn!("sender input {} unexpectedly terminated", input.name());
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
Expand Down
9 changes: 5 additions & 4 deletions bridge/svix-bridge/src/webhook_receiver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use axum::{
};
use serde_json::json;
use svix_bridge_types::{
async_trait, svix::webhooks::Webhook, ForwardRequest, ReceiverOutput, TransformationConfig,
TransformerInput, TransformerInputFormat, TransformerJob, TransformerOutput,
async_trait, svix::webhooks::Webhook, BoxError, ForwardRequest, ReceiverOutput,
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
TransformerOutput,
};
use tower::{Service, ServiceExt};

Expand Down Expand Up @@ -37,8 +38,8 @@ impl ReceiverOutput for FakeReceiverOutput {
"fake output"
}

async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
self.tx.send(request.payload).unwrap();
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.tx.send(request.payload)?;
Ok(())
}
}
Expand Down

0 comments on commit aae42f9

Please sign in to comment.