Skip to content

Commit

Permalink
Bridge: decode svix events poller's "bridge config" from base64
Browse files Browse the repository at this point in the history
There's a new authtoken endpoint for creating fresh events
subscriptions. In the response is a base64 encoded json structure
including the authtoken to use, the subscription id, and the app id -
everything we need to build an appropriate URL for fetching events from
the stream.
  • Loading branch information
svix-onelson committed Jul 10, 2024
1 parent 9070949 commit 72e872d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
1 change: 1 addition & 0 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ publish = false

[dependencies]
anyhow = "1"
base64 = "0.13.1"
clap = { version = "4.2.4", features = ["env", "derive"] }
axum = { version = "0.6", features = ["macros"] }
enum_dispatch = "0.3"
Expand Down
34 changes: 28 additions & 6 deletions bridge/svix-bridge/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use anyhow::anyhow;
use serde::Deserialize;
use serde::{de, Deserialize, Deserializer};
use shellexpand::LookupError;
#[cfg(feature = "kafka")]
use svix_bridge_plugin_kafka::{KafkaInputOpts, KafkaOutputOpts};
Expand Down Expand Up @@ -253,13 +253,35 @@ impl WebhookReceiverConfig {
}
}

#[derive(Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MessageStreamBridgeConfig {
pub token: String,
pub app_id: String,
pub subscription_id: String,
}

fn deserialize_message_stream_bridge_config<'de, D>(
deserializer: D,
) -> Result<MessageStreamBridgeConfig, D::Error>
where
D: Deserializer<'de>,
{
let buf = String::deserialize(deserializer)?;
let decoded = base64::decode(buf)
.map_err(|e| de::Error::custom(format!("failed to decode subscription config: {e:?}")))?;
serde_json::from_slice(&decoded)
.map_err(|e| de::Error::custom(format!("failed to decode subscription config: {e:?}")))
}

#[derive(Clone, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum PollerInputOpts {
SvixEvents {
app_id: String,
subscription_id: String,
svix_token: String,
/// This is the base64 encoded JSON given as `bridgeConfig` in the response from
/// `v1.message.events-subscription.create-token`.
#[serde(deserialize_with = "deserialize_message_stream_bridge_config")]
subscription_token: MessageStreamBridgeConfig,
#[serde(default)]
svix_options: Option<SvixOptions>,
},
Expand All @@ -269,11 +291,11 @@ impl PollerInputOpts {
pub fn svix_client(&self) -> Option<Svix> {
match self {
PollerInputOpts::SvixEvents {
svix_token,
subscription_token,
svix_options,
..
} => Some(Svix::new(
svix_token.clone(),
subscription_token.token.clone(),
svix_options.clone().map(Into::into),
)),
}
Expand Down
12 changes: 9 additions & 3 deletions bridge/svix-bridge/src/webhook_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use tracing::instrument;
use types::{IntegrationId, IntegrationState, InternalState, SerializableRequest, Unvalidated};

use crate::{
config::{PollerInputOpts, PollerReceiverConfig, WebhookReceiverConfig},
config::{
MessageStreamBridgeConfig, PollerInputOpts, PollerReceiverConfig, WebhookReceiverConfig,
},
webhook_receiver::types::SerializablePayload,
};

Expand Down Expand Up @@ -256,8 +258,12 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
let mut sleep_time = NO_SLEEP;

let PollerInputOpts::SvixEvents {
app_id,
subscription_id,
subscription_token:
MessageStreamBridgeConfig {
app_id,
subscription_id,
..
},
..
} = &poller.input_opts;

Expand Down

0 comments on commit 72e872d

Please sign in to comment.