Skip to content

Commit

Permalink
backend: simplify insert in completed_job
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Dec 31, 2024
1 parent eeece84 commit afb0a55
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 102 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

192 changes: 90 additions & 102 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}

let _job_id = queued_job.id;
let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async {
let (canceled_by, canceled_reason) =
canceled_by.map_or((None, None), |c| (c.username, c.reason));
let (opt_uuid, _duration, canceled, _skip_downstream_error_handlers) = (|| async {
let mut tx = db.begin().await?;

let job_id = queued_job.id;
Expand All @@ -531,103 +533,94 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
serde_json::to_string(&result).unwrap_or_else(|_| "".to_string())
);

let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_427.read().await {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
.or_else(|_| {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
})
.await
.unwrap_or_default()
} else {
(None, None, None)
};

let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
// add_time!(bench, "add_completed_job query START");

let _duration = sqlx::query_scalar!(
"INSERT INTO completed_job AS cj
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,\
$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms",
queued_job.workspace_id,
queued_job.id,
queued_job.parent_job,
queued_job.created_by,
queued_job.created_at,
queued_job.started_at,
success,
queued_job.script_hash.map(|x| x.0),
queued_job.script_path,
&queued_job.args as &Option<Json<HashMap<String, Box<RawValue>>>>,
result as Json<&T>,
raw_code,
raw_lock,
canceled_by.is_some(),
canceled_by.clone().map(|cb| cb.username).flatten(),
canceled_by.clone().map(|cb| cb.reason).flatten(),
queued_job.job_kind.clone() as JobKind,
queued_job.schedule_path,
queued_job.permissioned_as,
&queued_job.flow_status as &Option<Json<Box<RawValue>>>,
&raw_flow as &Option<Json<Box<RawValue>>>,
queued_job.is_flow_step,
skipped,
queued_job.language.clone() as Option<ScriptLang>,
queued_job.email,
queued_job.visible_to_owner,
if mem_peak > 0 { Some(mem_peak) } else { None },
queued_job.tag,
queued_job.priority,
duration,
let (_duration, canceled) = sqlx::query!(
r#"
INSERT INTO completed_job
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
SELECT
queue.workspace_id
, queue.id
, queue.parent_job
, queue.created_by
, queue.created_at
, queue.started_at
, COALESCE($2::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE(queue.started_at, now())))) * 1000)
, $3 AS success
, queue.script_hash
, queue.script_path
, queue.args
, $4 AS result
, queue.raw_code
, queue.raw_lock
, queue.canceled OR COALESCE($7, queue.canceled_by) IS NOT NULL as canceled
, COALESCE($7, queue.canceled_by) as canceled_by
, COALESCE($8, queue.canceled_reason) as canceled_reason
, queue.job_kind
, queue.schedule_path
, queue.permissioned_as
, queue.flow_status
, queue.raw_flow
, queue.is_flow_step
, $5 AS is_skipped
, queue.language
, queue.email
, queue.visible_to_owner
, COALESCE($6, queue.mem_peak) AS mem_peak
, queue.tag
, queue.priority
FROM queue
WHERE queue.id = $1
LIMIT 1
ON CONFLICT (id) DO UPDATE SET
success = $3,
result = $4
RETURNING duration_ms, canceled
"#,
/* $1 */ queued_job.id,
/* $2 */ duration,
/* $3 */ success,
/* $4 */ result as Json<&T>,
/* $5 */ skipped,
/* $6 */ if mem_peak > 0 { Some(mem_peak) } else { None },
/* $7 */ canceled_by,
/* $8 */ canceled_reason,
)
.fetch_one(&mut *tx)
.await
.map(|record| (record.duration_ms, record.canceled))
.map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?;


Expand Down Expand Up @@ -733,7 +726,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
match err {
Error::QuotaExceeded(_) => (),
// scheduling next job failed and could not disable schedule => make zombie job to retry
_ => return Ok((Some(job_id), 0, true)),
_ => return Ok((Some(job_id), 0, canceled, true)),
}
};
}
Expand Down Expand Up @@ -844,7 +837,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
"inserted completed job: {} (success: {success})",
queued_job.id
);
Ok((None, _duration, _skip_downstream_error_handlers)) as windmill_common::error::Result<(Option<Uuid>, i64, bool)>
Ok((None, _duration, canceled, _skip_downstream_error_handlers)) as error::Result<(Option<Uuid>, i64, bool, bool)>
})
.retry(
ConstantBuilder::default()
Expand Down Expand Up @@ -976,13 +969,8 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
);
}

if let Err(err) = send_error_to_workspace_handler(
&queued_job,
canceled_by.is_some(),
db,
Json(&result),
)
.await
if let Err(err) =
send_error_to_workspace_handler(&queued_job, canceled, db, Json(&result)).await
{
match err {
Error::QuotaExceeded(_) => {}
Expand All @@ -1001,7 +989,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}
}

if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && canceled_by.is_none() {
if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && !canceled {
if let Some(hash) = queued_job.script_hash {
let p = sqlx::query_scalar!(
"SELECT restart_unless_cancelled FROM script WHERE hash = $1 AND workspace_id = $2",
Expand Down

0 comments on commit afb0a55

Please sign in to comment.