Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
omniqueue = "0.2.0"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
svix-bridge-types = { path = "../svix-bridge-types" }
Expand All @@ -15,13 +14,19 @@ tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
tracing = "0.1"

[dependencies.omniqueue]
git = "https://github.com/svix/omniqueue-rs"
rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b"
default-features = false
features = ["gcp_pubsub", "rabbitmq", "redis", "sqs"]

[dev-dependencies]
aws-config = "1.1.5"
aws-sdk-sqs = "1.13.0"
fastrand = "2.0.1"
google-cloud-googleapis = "0.12.0"
google-cloud-pubsub = "0.23.0"
google-cloud-pubsub = "0.24.0"
lapin = "2"
redis = { version = "0.24.0", features = ["tokio-comp", "streams"] }
redis = { version = "0.25.4", features = ["tokio-comp", "streams"] }
tracing-subscriber = "0.3"
wiremock = "0.5.18"
58 changes: 27 additions & 31 deletions bridge/svix-bridge-plugin-queue/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
let delayed_lock_key = format!("{delayed_queue_key}_lock");

backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
reinsert_on_nack: cfg.reinsert_on_nack,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
consumer_group: cfg.consumer_group.clone(),
consumer_name: cfg.consumer_name.clone(),
// FIXME: expose in config?
payload_key: "payload".to_string(),
ack_deadline_ms: cfg.ack_deadline_ms,
},
)
backends::RedisBackend::builder(backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
reinsert_on_nack: cfg.reinsert_on_nack,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
consumer_group: cfg.consumer_group.clone(),
consumer_name: cfg.consumer_name.clone(),
// FIXME: expose in config?
payload_key: "payload".to_string(),
ack_deadline_ms: cfg.ack_deadline_ms,
})
.make_dynamic()
.build_consumer()
.await
Expand All @@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
.unwrap_or_else(|| format!("{}_delays", cfg.queue_key));
let delayed_lock_key = format!("{delayed_queue_key}_lock");

backends::RedisBackend::<backends::redis::RedisMultiplexedConnectionManager>::builder(
backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
// FIXME: expose in config?
payload_key: "payload".to_string(),
// consumer stuff we don't care about.
reinsert_on_nack: false,
consumer_group: String::new(),
consumer_name: String::new(),
ack_deadline_ms: cfg.ack_deadline_ms,
},
)
backends::RedisBackend::builder(backends::RedisConfig {
dsn: cfg.dsn.clone(),
max_connections: cfg.max_connections,
queue_key: cfg.queue_key.clone(),
delayed_queue_key,
delayed_lock_key,
// FIXME: expose in config?
payload_key: "payload".to_string(),
// consumer stuff we don't care about.
reinsert_on_nack: false,
consumer_group: String::new(),
consumer_name: String::new(),
ack_deadline_ms: cfg.ack_deadline_ms,
})
.make_dynamic()
.build_producer()
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn create_test_stream(client: &Client) -> String {
.take(8)
.collect();

let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&name, "test_cg", 0i8)
Expand All @@ -70,12 +70,12 @@ async fn create_test_stream(client: &Client) -> String {
}

async fn delete_test_stream(client: &Client, key: &str) {
let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let _: () = conn.del(key).await.unwrap();
}

async fn publish(client: &Client, key: &str, payload: &str) {
let mut conn = client.get_async_connection().await.unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
// N.b. the redis code relies on the messages being json with a `payload` key in there.
// The `payload` key can be any valid JSON value.
let _: () = conn.xadd(key, "*", &[("payload", payload)]).await.unwrap();
Expand Down