Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bridge: Error refactoring #1342

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bridge/.config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = { period = "30s", terminate-after = 2 }
1 change: 1 addition & 0 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bridge/svix-bridge-plugin-kafka/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl SenderInput for KafkaConsumer {
self.transformer_tx = tx;
}

async fn run(&self) -> std::io::Result<()> {
async fn run(&self) {
let mut fails: u64 = 0;
let mut last_fail = Instant::now();

Expand Down
14 changes: 7 additions & 7 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn test_consume_ok() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn test_consume_transformed_json_ok() {
plugin.set_transformer(Some(transformer_tx));

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -330,7 +330,7 @@ async fn test_consume_transformed_string_ok() {
plugin.set_transformer(Some(transformer_tx));

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -365,7 +365,7 @@ async fn test_missing_app_id_nack() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});

// Wait for the consumer to connect
Expand Down Expand Up @@ -415,7 +415,7 @@ async fn test_missing_event_type_nack() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});

// Wait for the consumer to connect
Expand Down Expand Up @@ -467,7 +467,7 @@ async fn test_consume_svix_503() {
let plugin = get_test_plugin(mock_server.uri(), topic, None);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down Expand Up @@ -510,7 +510,7 @@ async fn test_consume_svix_offline() {
drop(mock_server);

let handle = tokio::spawn(async move {
plugin.run().await.unwrap();
plugin.run().await;
});
// Wait for the consumer to connect
tokio::time::sleep(CONNECT_WAIT_TIME).await;
Expand Down
1 change: 1 addition & 0 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
svix-bridge-types = { path = "../svix-bridge-types" }
thiserror = "1.0.61"
tokio = { version = "1", features = ["full"] }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
Expand Down
40 changes: 10 additions & 30 deletions bridge/svix-bridge-plugin-queue/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
pub use omniqueue::QueueError;
use svix_bridge_types::svix;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
Payload(String),
Json(serde_json::Error),
Queue(QueueError),
Svix(svix::error::Error),
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
#[error("queue error: {0}")]
Queue(#[from] QueueError),
#[error("svix API error: {0}")]
Svix(#[from] svix::error::Error),
#[error("{0}")]
Generic(String),
}
pub type Result<T> = std::result::Result<T, Error>;

impl From<svix::error::Error> for Error {
fn from(value: svix::error::Error) -> Self {
Error::Svix(value)
}
}

impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Error::Json(value)
}
}

impl From<QueueError> for Error {
fn from(value: QueueError) -> Self {
Error::Queue(value)
}
}

impl From<String> for Error {
fn from(value: String) -> Self {
Self::Generic(value)
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

impl From<Error> for std::io::Error {
fn from(value: Error) -> Self {
match value {
Error::Payload(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Json(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Queue(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Error::Svix(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
Expand Down
2 changes: 1 addition & 1 deletion bridge/svix-bridge-plugin-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait Consumer {
}
}

async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> std::io::Result<()> {
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> ! {
let mut fails: u64 = 0;
let mut last_fail = Instant::now();
let system_name = consumer.system();
Expand Down
12 changes: 5 additions & 7 deletions bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use omniqueue::DynProducer;
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};

use crate::{config::QueueOutputOpts, error::Result};

Expand Down Expand Up @@ -42,11 +42,9 @@ impl ReceiverOutput for QueueForwarder {
fn name(&self) -> &str {
&self.name
}
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
Ok(self
.sender
.send_serde_json(&request.payload)
.await
.map_err(crate::Error::from)?)

async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.sender.send_serde_json(&request.payload).await?;
Ok(())
}
}
4 changes: 3 additions & 1 deletion bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ impl SenderInput for QueueSender {
fn name(&self) -> &str {
&self.name
}

fn set_transformer(&mut self, tx: Option<TransformerTx>) {
self.transformer_tx = tx;
}
async fn run(&self) -> std::io::Result<()> {

async fn run(&self) {
run_inner(self).await
}
}
6 changes: 4 additions & 2 deletions bridge/svix-bridge-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,19 @@ pub trait SenderInput: Send {
/// For plugins that want to run JS transformations on payloads.
/// Giving them a sender lets them pass messages to the JS executor.
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
async fn run(&self) -> std::io::Result<()>;
async fn run(&self);
}

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Represents something we can hand a webhook payload to.
/// Aka a "forwarder."
///
/// To start, we're only using this in conjunction with an HTTP server "owned" by the bridge binary.
#[async_trait]
pub trait ReceiverOutput: Send + Sync {
fn name(&self) -> &str;
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()>;
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError>;
}

#[derive(Deserialize, Debug, Clone, Default)]
Expand Down
13 changes: 3 additions & 10 deletions bridge/svix-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,10 @@ async fn supervise_senders(inputs: Vec<Box<dyn SenderInput>>) -> Result<()> {
set.spawn(async move {
// FIXME: needs much better signaling for termination
loop {
let fut = input.run();
// If this future returns, the consumer terminated unexpectedly.
if let Err(e) = fut.await {
tracing::warn!(
"sender input {} unexpectedly terminated: {}",
input.name(),
e
);
} else {
tracing::warn!("sender input {} unexpectedly terminated", input.name());
}
input.run().await;

tracing::warn!("sender input {} unexpectedly terminated", input.name());
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
Expand Down
9 changes: 5 additions & 4 deletions bridge/svix-bridge/src/webhook_receiver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use axum::{
};
use serde_json::json;
use svix_bridge_types::{
async_trait, svix::webhooks::Webhook, ForwardRequest, ReceiverOutput, TransformationConfig,
TransformerInput, TransformerInputFormat, TransformerJob, TransformerOutput,
async_trait, svix::webhooks::Webhook, BoxError, ForwardRequest, ReceiverOutput,
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
TransformerOutput,
};
use tower::{Service, ServiceExt};

Expand Down Expand Up @@ -37,8 +38,8 @@ impl ReceiverOutput for FakeReceiverOutput {
"fake output"
}

async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
self.tx.send(request.payload).unwrap();
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.tx.send(request.payload)?;
Ok(())
}
}
Expand Down