Skip to content

Only enqueue 1 update_downloads job at a time #2539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/bin/enqueue-job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![deny(clippy::all)]

use cargo_registry::{db, env, tasks, util::Error};
use diesel::prelude::*;
use swirl::schema::background_jobs::dsl::*;
use swirl::Job;

fn main() -> Result<(), Error> {
Expand All @@ -11,7 +13,20 @@ fn main() -> Result<(), Error> {
println!("Enqueueing background job: {}", job);

match &*job {
"update_downloads" => Ok(tasks::update_downloads().enqueue(&conn)?),
"update_downloads" => {
let count: i64 = background_jobs
.filter(job_type.eq("update_downloads"))
.count()
.get_result(&conn)
.unwrap();

if count > 0 {
println!("Did not enqueue update_downloads, existing job already in progress");
Ok(())
} else {
Ok(tasks::update_downloads().enqueue(&conn)?)
}
Comment on lines +23 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First getting the count and then adding the job might end up not adding the job even though the queue is empty if a race condition happens. I guess it doesn't matter much though, the next job it will only have more work to do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if a job is already in the queue at the beginning of enqueue-job then it is fine if the background job is not enqueued. It might be possible to consolidate the actions into a single atomic query, but then we would be effectively re-implementing enqueue from swirl and that would probably be harder to maintain.

}
"dump_db" => {
let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL"));
let target_name = args
Expand Down
54 changes: 51 additions & 3 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,30 @@ use diesel::prelude::*;
fn main() -> Result<(), Error> {
let conn = db::connect_now()?;

check_stalled_background_jobs(&conn)?;
check_failing_background_jobs(&conn)?;
check_stalled_update_downloads(&conn)?;
check_spam_attack(&conn)?;
Ok(())
}

fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> {
/// Check for old background jobs that are not currently running.
///
/// This check includes `skip_locked` in the query and will only trigger on
/// enqueued jobs that have attempted to run and have failed (and are in the
/// queue awaiting a retry).
///
/// Within the default 15 minute time, a job should have already had several
/// failed retry attempts.
fn check_failing_background_jobs(conn: &PgConnection) -> Result<(), Error> {
use cargo_registry::schema::background_jobs::dsl::*;
use diesel::dsl::*;
use diesel::sql_types::Integer;

const EVENT_KEY: &str = "background_jobs";

println!("Checking for stalled background jobs");
println!("Checking for failed background jobs");

// Max job execution time in minutes
let max_job_time = dotenv::var("MAX_JOB_TIME")
.map(|s| s.parse::<i32>().unwrap())
.unwrap_or(15);
Expand Down Expand Up @@ -59,6 +69,44 @@ fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> {
Ok(())
}

/// Check for an `update_downloads` job that has run longer than expected
fn check_stalled_update_downloads(conn: &PgConnection) -> Result<(), Error> {
use cargo_registry::schema::background_jobs::dsl::*;
use chrono::{DateTime, NaiveDateTime, Utc};

const EVENT_KEY: &str = "update_downloads_stalled";

println!("Checking for stalled background jobs");

// Max job execution time in minutes
let max_job_time = dotenv::var("MONITOR_MAX_UPDATE_DOWNLOADS_TIME")
.map(|s| s.parse::<u32>().unwrap() as i64)
.unwrap_or(120);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the unit for this? Could you clarify that in comments?

Looking at the code below you compare it with minutes, so it would be 2 hours. If that's the case I don't see the point of this monitoring, as check_stalled_background_jobs fires if a job was in the queue for more than 15 minutes, alerting us 1hr45min before this check fires.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the unit is in minutes. Added a new commit to add comments.

The existing check_stalled_background_jobs check includes skip_locked, so tasks that are currently running are not caught by that check. In that context, "stalled" means that the job has failed (and likely been retried several times) but is not currently running. I've renamed the function to be a bit more clear and have added more context in the doc comment.


let start_time = background_jobs
.filter(job_type.eq("update_downloads"))
.select(created_at)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the crates.io schema that much: is this when the job was queued or started? If this is when it's queued, it might get false alerts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the time the job was first enqueued. Given the long (2 hour) default time, even if there is a delay in the starting the job after it is enqueued there should not be false alerts. The job typically completes in <10 minutes, and is only expect to run longer when someone downloads all versions of all crates in quick succession. The long default time is intended to allow the job plenty of time to complete before alerting, even in this occasional extreme case.

.first::<NaiveDateTime>(conn);

if let Ok(start_time) = start_time {
let start_time = DateTime::<Utc>::from_utc(start_time, Utc);
let minutes = Utc::now().signed_duration_since(start_time).num_minutes();

if minutes > max_job_time {
return log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {} minutes", minutes),
});
}
};

log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
})
}

/// Check for known spam patterns
fn check_spam_attack(conn: &PgConnection) -> Result<(), Error> {
use cargo_registry::models::krate::canon_crate_name;
use diesel::dsl::*;
Expand Down