Skip to content

Commit 9d4b2a3

Browse files
committed
Break update_downloads into smaller jobs
This changes the behavior of the `update_downloads` background job from processing all rows serially to spawning a smaller job for each 1000 rows that need to be processed. This shortens the amount of time that any one job runs (making us less likely to hit timeouts in the runner and encounter issues that #2267 and #1804 addressed). More importantly, it means that we are able to do more in parallel, reducing the overall time it takes to count downloads. About the Problem === There are two main thresholds we care about for how long this job takes to run: - If it takes longer than the interval at which we enqueue this job (typically every 10 minutes, currently every hour due to the issues this PR addresses), we can end up with two instances of it running in parallel. This causes downloads to get double counted, and the jobs tend to contend for row locks and slow each other down. The double counting will be corrected the next time the job runs. This only tends to happen if a crawler downloads a large number of crates in rapid succession, causing the rows we have to process to increase from our normal volume of ~10k per hour to ~150k. When this occurs, we're likely to hit the second threshold. - If it takes longer than `$MAX_JOB_TIME` (currently set to 60 for the reasons below, defaults to 15), I will be paged. This has been happening much more frequently as of late (which is why that env var is currently at 60 minutes). It's unclear if this is because crawlers are downloading large volumes of crates more frequently, or if we're just seeing normal volume push us over 15 minutes to process serially. Splitting into smaller jobs doesn't directly help either of those thresholds, but being able to process rows in parallel does, since the overall time this takes to complete will go down dramatically (currently by a factor of 4, but we can probably set the number of threads to higher than CPU cores and still see benefits since we're I/O bound). Based on extremely anecdotal, non-scientific measurements of "I ran `select count(*) from version_downloads where downloads != counted` while the job was churning through >100k rows roughly every minute a few times", we can process roughly ~4k rows per minute, which seems about right for 6 queries per row. We can substantially increase throughput if we reduce this to one round trip, but for now we can expect this to take roughly 15 seconds per batch. The longest I've ever seen this job take (and I get paged if it takes too long, I've 100% seen the longest run times) is just over an hour. Since this should reduce it by *at least* a factor of 4, this will mean the time it takes to run if every version was downloaded at least once since the last run will be around 15 minutes. If we can bring this down to a single round trip per row, that should further reduce it to around 2.5 minutes Since this means we'll use all available worker threads in parallel, it also means that even if we have `update_downloads` queued again before the previous run completed, it's unlikely to ever be looking at the same rows in parallel, since the batches from the second run wouldn't be handled until all but worker_count - 1 batches from the first run have completed. Drawbacks === There are two main drawbacks to this commit: - Since we no longer process rows serially before running `update_recent_crate_downloads`, the data in `recent_crate_downloads` will reflect the *previous* run of `update_downloads`, meaning it's basically always 10-20 minutes behind. This is a regression over a few months ago, where it was typically 3-13 minutes behind, but an improvement over today, where it's 3-63 minutes behind. - The entire background queue will be blocked while `update_downloads` runs. This was the case prior to #1804. At the time of that commit, we did not consider blocking publishes to be a problem. We added the additional thread (assuming only one would be taken by `update_downloads` at any given time) to prevent the runner from crashing because it couldn't tell if progress was being made. That won't be an issue with this commit (since we're always going to make progress in relatively small chunks), but does mean that index updates will potentially be delayed by as much as 15 minutes in the worst case. (this number may be higher than is realistic since we've only observed >1 hour runs with the job set to queue hourly, meaning more rows to process per run). Typically the delay will only be at most 30 seconds. If I wasn't getting paged almost every day, I'd say this PR should be blocked on the second issue (which is resolved by adding queue priority to swirl). But given the operational load this issue is causing, I think increasing the worst case delay for index updates is a reasonable tradeoff for now. Impl details === I've written the test in a sorta funky way, adding functions to get a connection in and out of a test DB pool. This was primarily so I could change the tests to queue the job, and then run any pending jobs, without too much churn (this would otherwise require having the runner own the connection, and putting any uses of the connection in braces since we'd have to fetch it from the pool each time). This relies on an update to swirl (which is not in master at the time of writing this commit) for ease of testing. Testing `update_downloads` after this change requires actually running the background job. At the time of writing this, on master that would mean needing to construct a `background_jobs::Environment`, which involves cloning git indexes. The update to swirl means we can have the jobs take a connection directly, changing their environment type to `()`, making them much easier to test.
1 parent 97d7fc8 commit 9d4b2a3

File tree

2 files changed

+74
-21
lines changed

2 files changed

+74
-21
lines changed

src/db.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,18 @@ impl DieselPool {
3131
}
3232
}
3333

34-
fn test_conn(conn: PgConnection) -> Self {
34+
pub fn test_conn(conn: PgConnection) -> Self {
3535
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
3636
}
37+
38+
pub fn unwrap_test_conn(self) -> Result<PgConnection, Self> {
39+
match self {
40+
DieselPool::Test(shared_conn) => Arc::try_unwrap(shared_conn)
41+
.map(|c| c.into_inner())
42+
.map_err(Self::Test),
43+
other => Err(other),
44+
}
45+
}
3746
}
3847

3948
#[allow(missing_debug_implementations)]

src/tasks/update_downloads.rs

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,34 @@ use crate::{
66
use diesel::prelude::*;
77
use swirl::PerformError;
88

9+
#[cfg(not(test))]
10+
const ROWS_PER_BATCH: i64 = 1000;
11+
12+
#[cfg(test)]
13+
const ROWS_PER_BATCH: i64 = 1;
14+
915
#[swirl::background_job]
1016
pub fn update_downloads(conn: &PgConnection) -> Result<(), PerformError> {
11-
update(&conn)?;
12-
Ok(())
13-
}
14-
15-
fn update(conn: &PgConnection) -> QueryResult<()> {
1617
use self::version_downloads::dsl::*;
1718
use diesel::dsl::now;
1819
use diesel::select;
1920

20-
let rows = version_downloads
21-
.filter(processed.eq(false))
22-
.filter(downloads.ne(counted))
23-
.load(conn)?;
24-
25-
println!("Updating {} versions", rows.len());
26-
collect(conn, &rows)?;
27-
println!("Finished updating versions");
21+
println!("Enqueuing jobs to count downloads");
22+
let mut last_id = Some(0);
23+
while let Some(id) = last_id {
24+
let rows = version_downloads
25+
.filter(processed.eq(false))
26+
.filter(downloads.ne(counted))
27+
.filter(version_id.gt(id))
28+
.limit(ROWS_PER_BATCH)
29+
.select(version_id)
30+
.load(conn)?;
31+
last_id = rows.last().copied();
32+
if let Some(max_id) = last_id {
33+
update_downloads_batch(id, max_id).enqueue(&conn)?;
34+
}
35+
}
36+
println!("Finished enqueuing jobs");
2837

2938
// Anything older than 24 hours ago will be frozen and will not be queried
3039
// against again.
@@ -43,6 +52,23 @@ fn update(conn: &PgConnection) -> QueryResult<()> {
4352
Ok(())
4453
}
4554

55+
#[swirl::background_job]
56+
pub fn update_downloads_batch(
57+
conn: &PgConnection,
58+
min_version_id: i32,
59+
max_version_id: i32,
60+
) -> Result<(), PerformError> {
61+
use self::version_downloads::dsl::*;
62+
63+
let rows = version_downloads
64+
.filter(processed.eq(false))
65+
.filter(downloads.ne(counted))
66+
.filter(version_id.between(min_version_id, max_version_id))
67+
.load(conn)?;
68+
collect(conn, &rows)?;
69+
Ok(())
70+
}
71+
4672
fn collect(conn: &PgConnection, rows: &[VersionDownload]) -> QueryResult<()> {
4773
use diesel::update;
4874

@@ -89,6 +115,24 @@ mod test {
89115
};
90116
use std::collections::HashMap;
91117

118+
fn run_update(conn: PgConnection) -> PgConnection {
119+
use crate::db::DieselPool;
120+
use swirl::{Job, Runner};
121+
122+
super::update_downloads().enqueue(&conn).unwrap();
123+
let pool = DieselPool::test_conn(conn);
124+
{
125+
let runner = Runner::builder(())
126+
.thread_count(1)
127+
.connection_pool(pool.clone())
128+
.build();
129+
runner.run_all_pending_jobs().unwrap();
130+
runner.check_for_failed_jobs().unwrap();
131+
}
132+
pool.unwrap_test_conn()
133+
.unwrap_or_else(|_| panic!("couldn't unwrap pool"))
134+
}
135+
92136
fn conn() -> PgConnection {
93137
let conn = PgConnection::establish(&env("TEST_DATABASE_URL")).unwrap();
94138
conn.begin_test_transaction().unwrap();
@@ -142,7 +186,7 @@ mod test {
142186
.execute(&conn)
143187
.unwrap();
144188

145-
super::update(&conn).unwrap();
189+
let conn = run_update(conn);
146190
let version_downloads = versions::table
147191
.find(version.id)
148192
.select(versions::downloads)
@@ -153,7 +197,7 @@ mod test {
153197
.select(crates::downloads)
154198
.first(&conn);
155199
assert_eq!(Ok(1), crate_downloads);
156-
super::update(&conn).unwrap();
200+
let conn = run_update(conn);
157201
let version_downloads = versions::table
158202
.find(version.id)
159203
.select(versions::downloads)
@@ -178,7 +222,7 @@ mod test {
178222
))
179223
.execute(&conn)
180224
.unwrap();
181-
super::update(&conn).unwrap();
225+
let conn = run_update(conn);
182226
let processed = version_downloads::table
183227
.filter(version_downloads::version_id.eq(version.id))
184228
.select(version_downloads::processed)
@@ -202,7 +246,7 @@ mod test {
202246
))
203247
.execute(&conn)
204248
.unwrap();
205-
super::update(&conn).unwrap();
249+
let conn = run_update(conn);
206250
let processed = version_downloads::table
207251
.filter(version_downloads::version_id.eq(version.id))
208252
.select(version_downloads::processed)
@@ -252,7 +296,7 @@ mod test {
252296
.filter(crates::id.eq(krate.id))
253297
.first::<Crate>(&conn)
254298
.unwrap();
255-
super::update(&conn).unwrap();
299+
let conn = run_update(conn);
256300
let version2 = versions::table
257301
.find(version.id)
258302
.first::<Version>(&conn)
@@ -265,7 +309,7 @@ mod test {
265309
.unwrap();
266310
assert_eq!(krate2.downloads, 2);
267311
assert_eq!(krate2.updated_at, krate_before.updated_at);
268-
super::update(&conn).unwrap();
312+
let conn = run_update(conn);
269313
let version3 = versions::table
270314
.find(version.id)
271315
.first::<Version>(&conn)
@@ -300,7 +344,7 @@ mod test {
300344
.execute(&conn)
301345
.unwrap();
302346

303-
super::update(&conn).unwrap();
347+
let conn = run_update(conn);
304348
let versions_changed = versions::table
305349
.select(versions::updated_at.ne(now - 2.days()))
306350
.get_result(&conn);

0 commit comments

Comments
 (0)