Skip to content

Commit 0d62fe1

Browse files
committed
Ensure the update_downloads job doesn't run concurrently
This is an improved implementation of #2157. The previous design relied on a transaction based lock to manage the lifetime of the lock. The wrapper transaction caused the `update_downloads` job to interfere with incoming download requests, and the changes had to be reverted. This implementation uses a session lock which is automatically released even if the callback panics. If multiple instances of the `update_downloads` job are run concurrently then it is possible to over count downloads, at least temporarily. The first job selects all matching `version_downloads` and later uses those values to calculate how many downloads to add to `versions` and `crates`. If a second job is run, it would select some rows from `version_downloads` that were already queued for processing by the first task. If an over count were to occur, the next time the job is run it should calculate a negative adjustment and correct the situation. There's no point in doing extra work and if we eventually need concurrency we should built that out intentionally. Therefore, this commit wraps the entire job in a transaction and obtains an transaction level advisory lock from the database. If the lock has already been taken the job will fail and will be retried by swirl. If the duration of this job begins to approach the scheduling interval, then we will want to increase that interval to avoid triggering alerts.
1 parent 556e2b7 commit 0d62fe1

File tree

4 files changed

+111
-1
lines changed

4 files changed

+111
-1
lines changed

src/tasks.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
pub mod dump_db;
22
mod update_downloads;
3+
mod util;
34

45
pub use dump_db::dump_db;
56
pub use update_downloads::update_downloads;
7+
8+
pub(self) use self::util::advisory_lock::with_advisory_lock;
9+
10+
const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1;

src/tasks/update_downloads.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use super::with_advisory_lock;
2+
use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY;
13
use crate::{
24
background_jobs::Environment,
35
models::VersionDownload,
@@ -10,7 +12,7 @@ use swirl::PerformError;
1012
#[swirl::background_job]
1113
pub fn update_downloads(env: &Environment) -> Result<(), PerformError> {
1214
let conn = env.connection()?;
13-
update(&conn)?;
15+
with_advisory_lock(&conn, LOCK_KEY, update)?;
1416
Ok(())
1517
}
1618

src/tasks/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(super) mod advisory_lock;

src/tasks/util/advisory_lock.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::error::Error;
2+
3+
use diesel::prelude::*;
4+
use diesel::sql_types::BigInt;
5+
use diesel::PgConnection;
6+
7+
sql_function!(fn pg_try_advisory_lock(key: BigInt) -> Bool);
8+
sql_function!(fn pg_advisory_unlock(key: BigInt) -> Bool);
9+
10+
/// Run the callback if the session advisory lock for the given key can be obtained.
11+
///
12+
/// If the lock is not already held, the callback will be called and the lock will be unlocked
13+
/// after the closure returns. If the lock is already held, the function returns an error without
14+
/// calling the callback.
15+
pub(crate) fn with_advisory_lock<F>(
16+
conn: &PgConnection,
17+
key: i64,
18+
f: F,
19+
) -> Result<(), Box<dyn Error>>
20+
where
21+
F: FnOnce(&PgConnection) -> QueryResult<()>,
22+
{
23+
if !diesel::select(pg_try_advisory_lock(key)).get_result(conn)? {
24+
let string = format!(
25+
"A job holding the session advisory lock for key {} is already running",
26+
key
27+
);
28+
println!("return");
29+
return Err(string.into());
30+
}
31+
println!("umm");
32+
let _dont_drop_yet = DropGuard { conn, key };
33+
f(conn).map_err(Into::into)
34+
}
35+
36+
struct DropGuard<'a> {
37+
conn: &'a PgConnection,
38+
key: i64,
39+
}
40+
41+
impl<'a> Drop for DropGuard<'a> {
42+
fn drop(&mut self) {
43+
match diesel::select(pg_advisory_unlock(self.key)).get_result(self.conn) {
44+
Ok(true) => (),
45+
Ok(false) => println!(
46+
"Error: job advisory lock for key {} was not locked",
47+
self.key
48+
),
49+
Err(err) => println!("Error unlocking advisory lock (key: {}): {}", self.key, err),
50+
}
51+
}
52+
}
53+
54+
#[cfg(test)]
55+
mod tests {
56+
use super::*;
57+
use crate::test_util::*;
58+
59+
#[test]
60+
fn lock_released_after_callback_returns() {
61+
const KEY: i64 = -1;
62+
let conn1 = pg_connection();
63+
let conn2 = pg_connection();
64+
65+
let mut callback_run = false;
66+
let result = with_advisory_lock(&conn1, KEY, |_| {
67+
// Another connection cannot obtain the lock
68+
assert!(!diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2)?);
69+
callback_run = true;
70+
Ok(())
71+
});
72+
assert!(result.is_ok());
73+
assert!(callback_run);
74+
75+
// Another connection can now obtain the lock
76+
assert_eq!(
77+
diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2),
78+
Ok(true)
79+
);
80+
}
81+
82+
#[test]
83+
fn test_already_locked() {
84+
const KEY: i64 = -2;
85+
let conn1 = pg_connection();
86+
let conn2 = pg_connection();
87+
88+
// Another connection obtains the lock first
89+
assert_eq!(
90+
diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2),
91+
Ok(true)
92+
);
93+
94+
let mut callback_run = false;
95+
let result = with_advisory_lock(&conn1, KEY, |_| {
96+
callback_run = true;
97+
Ok(())
98+
});
99+
assert!(dbg!(result).is_err());
100+
assert!(!callback_run);
101+
}
102+
}

0 commit comments

Comments
 (0)