Skip to content

Commit 8602f09

Browse files
committed
Auto merge of #2539 - jtgeibel:there-can-only-be-one, r=pietroalbini
Only enqueue 1 update_downloads job at a time This ensures that if an `update_downloads` job is already running, a duplicate job will not be enqueued. Currently, when multiple jobs are running in parallel, they end up doing duplicate work resulting in temporary overcounts that must be corrected in the next run. The concurrent tasks also slow down the overall process and can result in runaway performance problems as further jobs are spawned. This commit also updates the monitoring to specifically check if the update downloads job runs for too long (120 minutes by default). The main check for stalled jobs will not trigger for `update_downloads` as the row is locked for the duration of the job (and `skip_locked` is used in that query). r? @pietroalbini
2 parents 44773ef + a0b3e7b commit 8602f09

File tree

2 files changed

+67
-4
lines changed

2 files changed

+67
-4
lines changed

src/bin/enqueue-job.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#![deny(clippy::all)]
22

33
use cargo_registry::{db, env, tasks, util::Error};
4+
use diesel::prelude::*;
5+
use swirl::schema::background_jobs::dsl::*;
46
use swirl::Job;
57

68
fn main() -> Result<(), Error> {
@@ -11,7 +13,20 @@ fn main() -> Result<(), Error> {
1113
println!("Enqueueing background job: {}", job);
1214

1315
match &*job {
14-
"update_downloads" => Ok(tasks::update_downloads().enqueue(&conn)?),
16+
"update_downloads" => {
17+
let count: i64 = background_jobs
18+
.filter(job_type.eq("update_downloads"))
19+
.count()
20+
.get_result(&conn)
21+
.unwrap();
22+
23+
if count > 0 {
24+
println!("Did not enqueue update_downloads, existing job already in progress");
25+
Ok(())
26+
} else {
27+
Ok(tasks::update_downloads().enqueue(&conn)?)
28+
}
29+
}
1530
"dump_db" => {
1631
let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL"));
1732
let target_name = args

src/bin/monitor.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,30 @@ use diesel::prelude::*;
1414
fn main() -> Result<(), Error> {
1515
let conn = db::connect_now()?;
1616

17-
check_stalled_background_jobs(&conn)?;
17+
check_failing_background_jobs(&conn)?;
18+
check_stalled_update_downloads(&conn)?;
1819
check_spam_attack(&conn)?;
1920
Ok(())
2021
}
2122

22-
fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> {
23+
/// Check for old background jobs that are not currently running.
24+
///
25+
/// This check includes `skip_locked` in the query and will only trigger on
26+
/// enqueued jobs that have attempted to run and have failed (and are in the
27+
/// queue awaiting a retry).
28+
///
29+
/// Within the default 15 minute time, a job should have already had several
30+
/// failed retry attempts.
31+
fn check_failing_background_jobs(conn: &PgConnection) -> Result<(), Error> {
2332
use cargo_registry::schema::background_jobs::dsl::*;
2433
use diesel::dsl::*;
2534
use diesel::sql_types::Integer;
2635

2736
const EVENT_KEY: &str = "background_jobs";
2837

29-
println!("Checking for stalled background jobs");
38+
println!("Checking for failed background jobs");
3039

40+
// Max job execution time in minutes
3141
let max_job_time = dotenv::var("MAX_JOB_TIME")
3242
.map(|s| s.parse::<i32>().unwrap())
3343
.unwrap_or(15);
@@ -59,6 +69,44 @@ fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> {
5969
Ok(())
6070
}
6171

72+
/// Check for an `update_downloads` job that has run longer than expected
73+
fn check_stalled_update_downloads(conn: &PgConnection) -> Result<(), Error> {
74+
use cargo_registry::schema::background_jobs::dsl::*;
75+
use chrono::{DateTime, NaiveDateTime, Utc};
76+
77+
const EVENT_KEY: &str = "update_downloads_stalled";
78+
79+
println!("Checking for stalled background jobs");
80+
81+
// Max job execution time in minutes
82+
let max_job_time = dotenv::var("MONITOR_MAX_UPDATE_DOWNLOADS_TIME")
83+
.map(|s| s.parse::<u32>().unwrap() as i64)
84+
.unwrap_or(120);
85+
86+
let start_time = background_jobs
87+
.filter(job_type.eq("update_downloads"))
88+
.select(created_at)
89+
.first::<NaiveDateTime>(conn);
90+
91+
if let Ok(start_time) = start_time {
92+
let start_time = DateTime::<Utc>::from_utc(start_time, Utc);
93+
let minutes = Utc::now().signed_duration_since(start_time).num_minutes();
94+
95+
if minutes > max_job_time {
96+
return log_and_trigger_event(on_call::Event::Trigger {
97+
incident_key: Some(EVENT_KEY.into()),
98+
description: format!("update_downloads job running for {} minutes", minutes),
99+
});
100+
}
101+
};
102+
103+
log_and_trigger_event(on_call::Event::Resolve {
104+
incident_key: EVENT_KEY.into(),
105+
description: Some("No stalled update_downloads job".into()),
106+
})
107+
}
108+
109+
/// Check for known spam patterns
62110
fn check_spam_attack(conn: &PgConnection) -> Result<(), Error> {
63111
use cargo_registry::models::krate::canon_crate_name;
64112
use diesel::dsl::*;

0 commit comments

Comments
 (0)