Skip to content

Remove unwraps from non-test code in queue/redis #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 74 additions & 32 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ async fn background_task(
// FIXME: needs to be a transaction
let keys: Vec<(String, String)> = pool
.zpopmin(&delayed_queue_name, keys.len() as isize)
.await
.unwrap();
.await?;
let tasks: Vec<&str> = keys
.iter()
// All information is stored in the key in which the ID and JSON formated task
Expand Down Expand Up @@ -212,16 +211,18 @@ async fn background_task(
/// Runs Redis queue migrations with the given delay schedule. Migrations are run on this schedule
/// such that if an old instance of the server is online after the migrations are made, that no data
/// will be lost assuming the old server is taken offline before the last scheduled delay.
async fn run_migration_schedule(delays: &[Duration], pool: RedisPool) {
async fn run_migration_schedule(delays: &[Duration], pool: RedisPool) -> Result<()> {
for delay in delays {
let mut pool = pool.get().await.unwrap();
let mut pool = pool.get().await?;

// drain legacy queues:
migrate_v1_to_v2_queues(&mut pool).await;
migrate_v2_to_v3_queues(&mut pool).await;

tokio::time::sleep(*delay).await;
}

Ok(())
}

/// An inner function allowing key constants to be variable for testing purposes
Expand Down Expand Up @@ -306,7 +307,9 @@ async fn new_pair_inner(
Duration::from_secs(60 * 60 * 24),
];

run_migration_schedule(&delays, pool).await;
if let Err(err) = run_migration_schedule(&delays, pool).await {
tracing::error!("{}", err);
};
}
});

Expand Down Expand Up @@ -367,15 +370,17 @@ fn to_redis_key(delivery: &TaskQueueDelivery) -> String {
format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just change this function to return a result, just in case, since this method is called w/in the send method which is potentially heavily used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think this is much more concerning than e.g. the migrations.

"{}|{}",
delivery.id,
serde_json::to_string(&delivery.task).unwrap()
serde_json::to_string(&delivery.task)
.expect("Could not parse string from TaskQueueDelivery.task")
)
}

fn from_redis_key(key: &str) -> TaskQueueDelivery {
// Get the first delimiter -> it has to have the |
let pos = key.find('|').unwrap();
let pos = key.find('|').expect("Key must contain '|'");
let id = (&key[..pos]).to_string();
let task = serde_json::from_str(&key[pos + 1..]).unwrap();
let task =
serde_json::from_str(&key[pos + 1..]).expect("Could not parse string from delimitted key");
TaskQueueDelivery { id, task }
}

Expand Down Expand Up @@ -507,17 +512,24 @@ impl TaskQueueReceive for RedisQueueConsumer {
}

async fn migrate_v2_to_v3_queues(pool: &mut PooledConnection<'_>) {
migrate_list_to_stream(pool, LEGACY_V2_MAIN, MAIN).await;
migrate_list_to_stream(pool, LEGACY_V2_PROCESSING, MAIN).await;
if let Err(err) = migrate_list_to_stream(pool, LEGACY_V2_MAIN, MAIN).await {
tracing::error!("{}", err);
};
if let Err(err) = migrate_list_to_stream(pool, LEGACY_V2_PROCESSING, MAIN).await {
tracing::error!("{}", err);
};
}

async fn migrate_list_to_stream(pool: &mut PooledConnection<'_>, legacy_queue: &str, queue: &str) {
async fn migrate_list_to_stream(
pool: &mut PooledConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
let legacy_keys: Vec<String> = pool
.lpop(legacy_queue, NonZeroUsize::new(batch_size))
.await
.unwrap();
.await?;
if legacy_keys.is_empty() {
break;
}
Expand All @@ -533,28 +545,43 @@ async fn migrate_list_to_stream(pool: &mut PooledConnection<'_>, legacy_queue: &
let _ = pipe.xadd(
queue,
GENERATE_STREAM_ID,
&[(QUEUE_KV_KEY, serde_json::to_string(&delivery.task).unwrap())],
&[(
QUEUE_KV_KEY,
serde_json::to_string(&delivery.task)
.expect("Could not parse string from TaskQueueDelivery.task"),
)],
);
}

let _: () = pool.query_async_pipeline(pipe).await.unwrap();
let _: () = pool.query_async_pipeline(pipe).await?;
}

Ok(())
}

async fn migrate_v1_to_v2_queues(pool: &mut PooledConnection<'_>) {
migrate_list(pool, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await;
migrate_list(pool, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await;
migrate_sset(pool, LEGACY_V1_DELAYED, DELAYED).await;
if let Err(err) = migrate_list(pool, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await {
tracing::error!("{}", err);
};
if let Err(err) = migrate_list(pool, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await {
tracing::error!("{}", err);
};
if let Err(err) = migrate_sset(pool, LEGACY_V1_DELAYED, DELAYED).await {
tracing::error!("{}", err);
};
}

async fn migrate_list(pool: &mut PooledConnection<'_>, legacy_queue: &str, queue: &str) {
async fn migrate_list(
pool: &mut PooledConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
// Checking for old messages from queue
let legacy_keys: Vec<String> = pool
.lpop(legacy_queue, NonZeroUsize::new(batch_size))
.await
.unwrap();
.await?;
if legacy_keys.is_empty() {
break;
}
Expand All @@ -563,15 +590,21 @@ async fn migrate_list(pool: &mut PooledConnection<'_>, legacy_queue: &str, queue
legacy_keys.len(),
legacy_queue
);
let _: () = pool.rpush(queue, legacy_keys).await.unwrap();
let _: () = pool.rpush(queue, legacy_keys).await?;
}

Ok(())
}

async fn migrate_sset(pool: &mut PooledConnection<'_>, legacy_queue: &str, queue: &str) {
async fn migrate_sset(
pool: &mut PooledConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
// Checking for old messages from LEGACY_DELAYED
let legacy_keys: Vec<(String, f64)> = pool.zpopmin(legacy_queue, batch_size).await.unwrap();
let legacy_keys: Vec<(String, f64)> = pool.zpopmin(legacy_queue, batch_size).await?;

if legacy_keys.is_empty() {
break;
Expand All @@ -584,8 +617,10 @@ async fn migrate_sset(pool: &mut PooledConnection<'_>, legacy_queue: &str, queue
let legacy_keys: Vec<(f64, String)> =
legacy_keys.into_iter().map(|(x, y)| (y, x)).collect();

let _: () = pool.zadd_multiple(queue, &legacy_keys).await.unwrap();
let _: () = pool.zadd_multiple(queue, &legacy_keys).await?;
}

Ok(())
}

#[cfg(test)]
Expand Down Expand Up @@ -639,7 +674,8 @@ pub mod tests {
let should_be_none: Option<String> = pool.lpop(TEST_QUEUE, None).await.unwrap();
assert!(should_be_none.is_none());

migrate_list(&mut pool, TEST_LEGACY, TEST_QUEUE).await;
let res = migrate_list(&mut pool, TEST_LEGACY, TEST_QUEUE).await;
assert!(res.is_ok());

let test_key: Option<String> = pool.lpop(TEST_QUEUE, None).await.unwrap();

Expand Down Expand Up @@ -669,7 +705,8 @@ pub mod tests {
let should_be_none: Vec<(String, i32)> = pool.zpopmin(TEST_QUEUE, 1).await.unwrap();
assert!(should_be_none.is_empty());

migrate_sset(&mut pool, TEST_LEGACY, TEST_QUEUE).await;
let res = migrate_sset(&mut pool, TEST_LEGACY, TEST_QUEUE).await;
assert!(res.is_ok());

let test_key: Vec<(String, i32)> = pool.zpopmin(TEST_QUEUE, 1).await.unwrap();

Expand Down Expand Up @@ -933,13 +970,18 @@ pub mod tests {
}

// v1 to v2
migrate_list(&mut conn, v1_main, v2_main).await;
migrate_list(&mut conn, v1_processing, v2_processing).await;
migrate_sset(&mut conn, v1_delayed, v2_delayed).await;
let r1 = migrate_list(&mut conn, v1_main, v2_main).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to do it in tests is just to unwrap. This is too cumbersome.

let r2 = migrate_list(&mut conn, v1_processing, v2_processing).await;
let r3 = migrate_sset(&mut conn, v1_delayed, v2_delayed).await;
assert!(r1.is_ok());
assert!(r2.is_ok());
assert!(r3.is_ok());

// v2 to v3
migrate_list_to_stream(&mut conn, v2_main, v3_main).await;
migrate_list_to_stream(&mut conn, v2_processing, v3_main).await;
let r4 = migrate_list_to_stream(&mut conn, v2_main, v3_main).await;
let r5 = migrate_list_to_stream(&mut conn, v2_processing, v3_main).await;
assert!(r4.is_ok());
assert!(r5.is_ok());
}

// Read
Expand Down