Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
Bring in downstream changes to prevent flakes here.
  • Loading branch information
jaymell authored and svix-james committed Oct 10, 2024
1 parent b82f9ff commit d0443bd
Showing 1 changed file with 67 additions and 91 deletions.
158 changes: 67 additions & 91 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ async fn migrate_sset(
pub mod tests {
use std::time::Duration;

use assert_matches::assert_matches;
use chrono::Utc;
use redis::{streams::StreamReadReply, AsyncCommands as _, Direction};
use tokio::time::timeout;
Expand All @@ -415,7 +414,7 @@ pub mod tests {
use crate::{
cfg::Configuration,
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer},
queue::{MessageTask, QueueTask},
redis::RedisManager,
};

Expand Down Expand Up @@ -492,18 +491,12 @@ pub mod tests {
assert_eq!(should_be_none, vec![]);
}

/// Reads and acknowledges all items in the queue with the given name for clearing out entries
/// from previous test runs
async fn flush_stale_queue_items(_p: TaskQueueProducer, c: &mut TaskQueueConsumer) {
while let Ok(recv) = timeout(
Duration::from_millis(100),
c.receive_all(TEST_RECV_DEADLINE),
)
.await
{
let recv = recv.unwrap().pop().unwrap();
recv.ack().await.unwrap();
}
async fn cleanup(pool: &RedisManager, q1: &str, q2: &str, q3: &str) {
let mut conn = pool
.get()
.await
.expect("Error retrieving connection from Redis pool");
let _: () = conn.del(&[q1, q2, q3]).await.unwrap();
}

#[tokio::test]
Expand All @@ -512,19 +505,16 @@ pub mod tests {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
&cfg,
Duration::from_millis(100),
"",
"{test}_idle_period",
"{test}_idle_period_delayed",
"{test}_idle_period_delayed_lock",
"{test}_dlq",
)
.await;
let main_queue = "{test}_idle_period";
let delayed = "{test}_idle_period_delayed";
let lock = "{test}_idle_period_delayed_lock";
let dlq = "{test}_dlq";

let delay = Duration::from_millis(100);

tokio::time::sleep(Duration::from_millis(150)).await;
flush_stale_queue_items(p.clone(), &mut c).await;
cleanup(&pool, main_queue, delayed, lock).await;

let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await;

let mt = QueueTask::MessageV1(MessageTask {
msg_id: MessageId("test".to_owned()),
Expand All @@ -540,9 +530,9 @@ pub mod tests {
.expect("`c.receive()` has timed out");
assert_eq!(*recv.unwrap()[0].task, mt);

tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(delay).await;

let recv = timeout(Duration::from_secs(5), c.receive_all(TEST_RECV_DEADLINE))
let recv = timeout(Duration::from_secs(1), c.receive_all(TEST_RECV_DEADLINE))
.await
.expect("`c.receive()` has timed out");
let recv = recv.unwrap().pop().unwrap();
Expand All @@ -555,12 +545,12 @@ pub mod tests {
.get()
.await
.expect("Error retrieving connection from Redis pool");
let keys = conn
.xread::<_, _, StreamReadReply>(&["{test}_ack"], &[0])
assert!(conn
.xread::<_, _, StreamReadReply>(&[main_queue], &[0])
.await
.unwrap()
.keys;
assert_matches!(keys.as_slice(), []);
.keys
.is_empty());
}

#[tokio::test]
Expand All @@ -569,30 +559,16 @@ pub mod tests {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

// Delete the keys used in this test to ensure nothing pollutes the output
let mut conn = pool
.get()
.await
.expect("Error retrieving connection from Redis pool");
let _: () = conn
.del(&[
"{test}_ack",
"{test}_ack_delayed",
"{test}_ack_delayed_lock",
])
.await
.unwrap();
let main_queue = "{test}_ack";
let delayed = "{test}_ack_delayed";
let lock = "{test}_ack_delayed_lock";
let dlq = "{test}_dlq";

let (p, mut c) = new_pair_inner(
&cfg,
Duration::from_millis(5000),
"",
"{test}_ack",
"{test}_ack_delayed",
"{test}_ack_delayed_lock",
"{test}_dlq",
)
.await;
cleanup(&pool, main_queue, delayed, lock).await;

let delay = Duration::from_millis(100);

let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await;

let mt = QueueTask::MessageV1(MessageTask {
msg_id: MessageId("test2".to_owned()),
Expand All @@ -612,38 +588,39 @@ pub mod tests {
assert_eq!(*recv.task, mt);
recv.ack().await.unwrap();

if let Ok(recv) = timeout(Duration::from_secs(1), c.receive_all(TEST_RECV_DEADLINE)).await {
if let Ok(recv) = timeout(delay, c.receive_all(TEST_RECV_DEADLINE)).await {
panic!("Received unexpected QueueTask {:?}", recv.unwrap()[0].task);
}

let mut conn = pool
.get()
.await
.expect("Error retrieving connection from Redis pool");
// And assert that the task has been deleted
let keys = conn
.xread::<_, _, StreamReadReply>(&["{test}_ack"], &[0])
assert!(conn
.xread::<_, _, StreamReadReply>(&[main_queue], &[0])
.await
.unwrap()
.keys;
assert_matches!(keys.as_slice(), []);
.keys
.is_empty());
}

#[tokio::test]
#[ignore]
async fn test_nack() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
&cfg,
Duration::from_millis(500),
"",
"{test}_nack",
"{test}_nack_delayed",
"{test}_nack_delayed_lock",
"{test}_dlq",
)
.await;
let main_queue = "{test}_nack";
let delayed = "{test}_nack_delayed";
let lock = "{test}_nack_delayed_lock";
let dlq = "{test}_nack_delayed_dlq";

tokio::time::sleep(Duration::from_millis(550)).await;
cleanup(&pool, main_queue, delayed, lock).await;

flush_stale_queue_items(p.clone(), &mut c).await;
let delay = Duration::from_millis(100);

let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await;

let mt = QueueTask::MessageV1(MessageTask {
msg_id: MessageId("test".to_owned()),
Expand All @@ -663,31 +640,30 @@ pub mod tests {
assert_eq!(*recv.task, mt);
recv.nack().await.unwrap();

let recv = timeout(Duration::from_secs(1), c.receive_all(TEST_RECV_DEADLINE))
.await
.expect("Expected QueueTask");
let recv = timeout(
Duration::from_millis(500) + delay,
c.receive_all(TEST_RECV_DEADLINE),
)
.await
.expect("Expected QueueTask");
assert_eq!(*recv.unwrap().pop().unwrap().task, mt);
}

#[tokio::test]
#[ignore]
async fn test_delay() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
&cfg,
Duration::from_millis(500),
"",
"{test}_delay",
"{test}_delay_delayed",
"{test}_delay_delayed_lock",
"{test}_dlq",
)
.await;
let main_queue = "{test}_delay";
let delayed = "{test}_delay_delayed";
let lock = "{test}_delay_delayed_lock";
let dlq = "{test}_delay_delayed_dlq";

tokio::time::sleep(Duration::from_millis(550)).await;
cleanup(&pool, main_queue, delayed, lock).await;

flush_stale_queue_items(p.clone(), &mut c).await;
let delay = Duration::from_millis(500);
let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await;

let mt1 = QueueTask::MessageV1(MessageTask {
msg_id: MessageId("test1".to_owned()),
Expand All @@ -709,20 +685,20 @@ pub mod tests {
.unwrap();
p.send(&mt2, None).await.unwrap();

let [recv2] = c
let recv2 = c
.receive_all(TEST_RECV_DEADLINE)
.await
.unwrap()
.try_into()
.pop()
.unwrap();
assert_eq!(*recv2.task, mt2);
recv2.ack().await.unwrap();

let [recv1] = c
let recv1 = c
.receive_all(TEST_RECV_DEADLINE)
.await
.unwrap()
.try_into()
.pop()
.unwrap();
assert_eq!(*recv1.task, mt1);
recv1.ack().await.unwrap();
Expand Down

0 comments on commit d0443bd

Please sign in to comment.