Skip to content

Commit

Permalink
bridge: Upgrade dependencies (#1482)
Browse files Browse the repository at this point in the history
## Motivation

After getting a rustsec warning in #1469, I attempted this upgrade last
week, but it failed due to a weird version constraint in opentelemetry
(`tokio ~= 1.38.0`). The rustsec ended up being updated so I could merge
the other PR w/o these upgrades, but I feel like it still makes sense to
get this merged now rather than later.

## Solution

Upgrade most of our dependencies, including axum 0.6 > 0.7.
  • Loading branch information
svix-jplatte authored Oct 9, 2024
2 parents eda2708 + 7687fdc commit 19e850a
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 291 deletions.
458 changes: 212 additions & 246 deletions bridge/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ tracing = "0.1"

[dependencies.omniqueue]
git = "https://github.com/svix/omniqueue-rs"
rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b"
rev = "e953ce07621a33708a4c28d9a5cfe431ede45dee"
default-features = false
features = ["gcp_pubsub", "rabbitmq", "redis", "sqs"]

[dev-dependencies]
aws-config = "1.1.5"
aws-sdk-sqs = "1.13.0"
fastrand = "2.0.1"
google-cloud-googleapis = "0.12.0"
google-cloud-pubsub = "0.24.0"
google-cloud-googleapis = "0.15.0"
google-cloud-pubsub = "0.29.1"
lapin = "2"
redis = { version = "0.25.4", features = ["tokio-comp", "streams"] }
redis = { version = "0.27.2", features = ["tokio-comp", "streams"] }
tracing-subscriber.workspace = true
wiremock.workspace = true

Expand Down
4 changes: 4 additions & 0 deletions bridge/svix-bridge-plugin-queue/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
// FIXME: expose in config?
payload_key: "payload".to_string(),
ack_deadline_ms: cfg.ack_deadline_ms,
dlq_config: None,
sentinel_config: None,
})
.make_dynamic()
.build_consumer()
Expand All @@ -80,6 +82,8 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
consumer_group: String::new(),
consumer_name: String::new(),
ack_deadline_ms: cfg.ack_deadline_ms,
dlq_config: None,
sentinel_config: None,
})
.make_dynamic()
.build_producer()
Expand Down
23 changes: 11 additions & 12 deletions bridge/svix-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,38 @@ publish = false
anyhow = "1"
base64 = "0.13.1"
clap = { version = "4.2.4", features = ["env", "derive"] }
axum = { version = "0.6", features = ["macros"] }
axum = { version = "0.7.7", features = ["macros"] }
enum_dispatch = "0.3"
itertools = "0.12.1"
http = "0.2"
itertools = "0.13.0"
once_cell = "1.18.0"
opentelemetry = "0.22.0"
opentelemetry_sdk = { version = "0.22.1", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics", "grpc-tonic", "http-proto", "reqwest-client"] }
opentelemetry = "0.26.0"
opentelemetry_sdk = { version = "0.26.0", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.26.0", features = ["metrics", "grpc-tonic", "http-proto", "reqwest-client"] }
serde.workspace = true
serde_json.workspace = true
serde_yaml = "0.9"
svix-ksuid = "0.7.0"
svix-ksuid = "0.8.0"
svix-bridge-plugin-queue = { path = "../svix-bridge-plugin-queue" }
svix-bridge-plugin-kafka = { optional = true, path = "../svix-bridge-plugin-kafka" }
svix-bridge-types.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-opentelemetry = "0.23.0"
tracing-opentelemetry = "0.27.0"
tracing-subscriber = { workspace = true, features = ["fmt", "json"] }
# N.b. for newer deno versions (like this) the runtimes must be retained and reused since they will leak memory if you
# create/drop them.
deno_core = "0.308.0"
deno_ast = "0.42.1"
deadpool = { version = "0.9.5", features = ["unmanaged", "rt_tokio_1"] }
deadpool = { version = "0.12.1", features = ["unmanaged", "rt_tokio_1"] }
shellexpand = { version = "3.1.0", default-features = false, features = ["base-0"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
tikv-jemalloc-ctl = { version = "0.5", optional = true, features = ["use_std"] }
tikv-jemallocator = { version = "0.6.0", optional = true }
tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["use_std", "stats"] }

[dev-dependencies]
chrono = "0.4"
tower = "0.4"
tower = "0.5.1"

[features]
default = ["kafka", "jemalloc"]
Expand Down
6 changes: 4 additions & 2 deletions bridge/svix-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use clap::Parser;
use itertools::{Either, Itertools};
use once_cell::sync::Lazy;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
metrics::{data::Temporality, reader::TemporalitySelector, InstrumentKind, SdkMeterProvider},
Expand Down Expand Up @@ -89,7 +90,7 @@ fn setup_tracing(cfg: &Config) {
.tracing()
.with_exporter(exporter)
.with_trace_config(
opentelemetry_sdk::trace::config()
opentelemetry_sdk::trace::Config::default()
.with_sampler(
otel_cfg
.sample_ratio
Expand All @@ -99,7 +100,8 @@ fn setup_tracing(cfg: &Config) {
.with_resource(get_svc_identifiers(cfg)),
)
.install_batch(Tokio)
.unwrap();
.unwrap()
.tracer("svix_bridge");

tracing_opentelemetry::layer().with_tracer(tracer)
});
Expand Down
8 changes: 4 additions & 4 deletions bridge/svix-bridge/src/webhook_receiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};

use axum::{
body::Body,
extract::{Path, State},
http,
routing::post,
Router,
};
Expand All @@ -26,7 +26,7 @@ mod config;
mod types;
mod verification;

fn router() -> Router<InternalState, Body> {
fn router() -> Router<InternalState> {
Router::new()
.route(
"/webhook/:integration_id",
Expand All @@ -50,8 +50,8 @@ pub async fn run(
let router = router().with_state(state);

tracing::info!("Listening on: {listen_addr}");
axum::Server::bind(&listen_addr)
.serve(router.into_make_service())
let listener = tokio::net::TcpListener::bind(listen_addr).await.unwrap();
axum::serve(listener, router)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
Expand Down
20 changes: 10 additions & 10 deletions bridge/svix-bridge/src/webhook_receiver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn test_forwarding_no_verification() {
.uri("/webhook/a")
.method("POST")
.header("content-type", "application/json")
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
.unwrap(),
)
.await
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn test_forwarding_multiple_receivers() {
.uri("/webhook/a")
.method("POST")
.header("content-type", "application/json")
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
.unwrap();

let response = ServiceExt::<Request<Body>>::ready(&mut app)
Expand All @@ -128,7 +128,7 @@ async fn test_forwarding_multiple_receivers() {
.uri("/webhook/b")
.method("POST")
.header("content-type", "application/json")
.body(serde_json::to_vec(&json!({"b": true})).unwrap().into())
.body(axum::body::Body::from(json!({ "b": true }).to_string()))
.unwrap();

let response = ServiceExt::<Request<Body>>::ready(&mut app)
Expand Down Expand Up @@ -200,7 +200,7 @@ async fn test_transformation_json() {
.uri("/webhook/transformed")
.method("POST")
.header("content-type", "application/json")
.body(serde_json::to_vec(&json!({"a": true})).unwrap().into())
.body(axum::body::Body::from(json!({ "a": true }).to_string()))
.unwrap();

let response = ServiceExt::<Request<Body>>::ready(&mut app)
Expand All @@ -222,7 +222,7 @@ async fn test_transformation_json() {
.uri("/webhook/as-is")
.method("POST")
.header("content-type", "application/json")
.body(serde_json::to_vec(&json!({"b": true})).unwrap().into())
.body(axum::body::Body::from(json!({ "b": true }).to_string()))
.unwrap();

let response = ServiceExt::<Request<Body>>::ready(&mut app)
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn test_transformation_string() {
.uri("/webhook/transformed")
.method("POST")
.header("content-type", "text/plain")
.body("plain text".as_bytes().into())
.body(axum::body::Body::from("plain text"))
.unwrap();

let response = ServiceExt::<Request<Body>>::ready(&mut app)
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn test_forwarding_svix_verification_mismatch() {
.header("svix-id", "msg_valid")
.header("svix-signature", signature.clone())
.header("svix-timestamp", &format!("{timestamp}"))
.body(sent_payload_bytes.into())
.body(axum::body::Body::from(sent_payload_bytes))
.unwrap(),
)
.await
Expand Down Expand Up @@ -383,13 +383,13 @@ async fn test_forwarding_svix_verification_match() {
.header("content-type", "application/json")
.header("svix-id", "msg_valid")
.header("svix-signature", signature.clone())
.header("svix-timestamp", &format!("{timestamp}"))
.body(payload_bytes.into())
.header("svix-timestamp", timestamp.to_string())
.body(axum::body::Body::from(payload_bytes))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let forwarded = a_rx.try_recv().unwrap();
assert_eq!(json!(forwarded), json!({"a": true}));
assert_eq!(json!(forwarded), json!({ "a": true }));
}
16 changes: 6 additions & 10 deletions bridge/svix-bridge/src/webhook_receiver/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use anyhow::Result;
use axum::{
async_trait,
body::{Bytes, HttpBody},
extract::FromRequest,
BoxError,
body::Bytes,
extract::{FromRequest, Request},
http::{self, HeaderMap, HeaderValue},
};
use http::{HeaderMap, HeaderValue, Request};
use serde::{Deserialize, Serialize};
use svix_bridge_types::{
svix, ReceiverInputOpts, ReceiverOutput, TransformationConfig, TransformerTx, WebhookVerifier,
Expand Down Expand Up @@ -212,16 +211,13 @@ impl From<RequestFromParts> for SerializableRequest<Unvalidated> {
}

#[async_trait]
impl<S, B> FromRequest<S, B> for SerializableRequest<Unvalidated>
impl<S> FromRequest<S> for SerializableRequest<Unvalidated>
where
S: Send + Sync,
B: HttpBody + Send + Sync + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = <RequestFromParts as FromRequest<S, B>>::Rejection;
type Rejection = <RequestFromParts as FromRequest<S>>::Rejection;

async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
RequestFromParts::from_request(req, state)
.await
.map(Into::into)
Expand Down
6 changes: 3 additions & 3 deletions bridge/svix-bridge/src/webhook_receiver/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub enum Verifier {
mod tests {
use std::sync::Arc;

use axum::extract::FromRequest;
use axum::{extract::FromRequest, http};
use svix_bridge_types::svix::webhooks::Webhook;

use super::{super::types::SerializableRequest, SvixVerifier, VerificationMethod};
Expand All @@ -107,7 +107,7 @@ mod tests {
.header("svix-id", "msg_valid")
.header("svix-signature", signature.clone())
.header("svix-timestamp", &format!("{timestamp}"))
.body(axum::body::Full::new(payload))
.body(axum::body::Body::from(payload))
.unwrap();

let sr = SerializableRequest::from_request(req, &()).await.unwrap();
Expand All @@ -119,7 +119,7 @@ mod tests {
.header("svix-id", "msg_invalid")
.header("svix-signature", signature)
.header("svix-timestamp", &format!("{timestamp}"))
.body(axum::body::Full::new(payload))
.body(axum::body::Body::from(payload))
.unwrap();

let sr = SerializableRequest::from_request(req, &()).await.unwrap();
Expand Down

0 comments on commit 19e850a

Please sign in to comment.