Skip to content

Commit

Permalink
backend: simplify insert in completed_job
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Dec 31, 2024
1 parent eeece84 commit b2ebb6d
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 129 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

240 changes: 111 additions & 129 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}

let _job_id = queued_job.id;
let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async {
let (canceled_by, canceled_reason) =
canceled_by.map_or((None, None), |c| (c.username, c.reason));
let (opt_uuid, _duration, canceled, _skip_downstream_error_handlers) = (|| async {
let mut tx = db.begin().await?;

let job_id = queued_job.id;
Expand All @@ -531,133 +533,118 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
serde_json::to_string(&result).unwrap_or_else(|_| "".to_string())
);

let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_427.read().await {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM job WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
.or_else(|_| {
sqlx::query!(
"SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json<Box<JsonRawValue>>\"
FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1",
&job_id,
&queued_job.workspace_id
)
.fetch_one(db)
.map_ok(|record| (record.raw_code, record.raw_lock, record.raw_flow))
})
.await
.unwrap_or_default()
} else {
(None, None, None)
};

let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
// add_time!(bench, "add_completed_job query START");

let _duration = sqlx::query_scalar!(
"INSERT INTO completed_job AS cj
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,\
$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms",
queued_job.workspace_id,
queued_job.id,
queued_job.parent_job,
queued_job.created_by,
queued_job.created_at,
queued_job.started_at,
success,
queued_job.script_hash.map(|x| x.0),
queued_job.script_path,
&queued_job.args as &Option<Json<HashMap<String, Box<RawValue>>>>,
result as Json<&T>,
raw_code,
raw_lock,
canceled_by.is_some(),
canceled_by.clone().map(|cb| cb.username).flatten(),
canceled_by.clone().map(|cb| cb.reason).flatten(),
queued_job.job_kind.clone() as JobKind,
queued_job.schedule_path,
queued_job.permissioned_as,
&queued_job.flow_status as &Option<Json<Box<RawValue>>>,
&raw_flow as &Option<Json<Box<RawValue>>>,
queued_job.is_flow_step,
skipped,
queued_job.language.clone() as Option<ScriptLang>,
queued_job.email,
queued_job.visible_to_owner,
if mem_peak > 0 { Some(mem_peak) } else { None },
queued_job.tag,
queued_job.priority,
duration,
let (_duration, canceled) = sqlx::query!(
r#"
INSERT INTO completed_job
( workspace_id
, id
, parent_job
, created_by
, created_at
, started_at
, duration_ms
, success
, script_hash
, script_path
, args
, result
, raw_code
, raw_lock
, canceled
, canceled_by
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, is_skipped
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority
)
SELECT
queue.workspace_id
, queue.id
, queue.parent_job
, queue.created_by
, queue.created_at
, queue.started_at
, COALESCE($2::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE(queue.started_at, now())))) * 1000)
, $3 AS success
, queue.script_hash
, queue.script_path
, queue.args
, $4 AS result
, queue.raw_code
, queue.raw_lock
, queue.canceled OR COALESCE($7, queue.canceled_by) IS NOT NULL as canceled
, COALESCE($7, queue.canceled_by) as canceled_by
, COALESCE($8, queue.canceled_reason) as canceled_reason
, queue.job_kind
, queue.schedule_path
, queue.permissioned_as
, queue.flow_status
, queue.raw_flow
, queue.is_flow_step
, $5 AS is_skipped
, queue.language
, queue.email
, queue.visible_to_owner
, GREATEST($6, queue.mem_peak) AS mem_peak
, queue.tag
, queue.priority
FROM queue
WHERE queue.id = $1
LIMIT 1
ON CONFLICT (id) DO UPDATE SET
success = $3,
result = $4
RETURNING duration_ms, canceled
"#,
/* $1 */ queued_job.id,
/* $2 */ duration,
/* $3 */ success,
/* $4 */ result as Json<&T>,
/* $5 */ skipped,
/* $6 */ if mem_peak > 0 { Some(mem_peak) } else { None },
/* $7 */ canceled_by,
/* $8 */ canceled_reason,
)
.fetch_one(&mut *tx)
.await
.map(|record| (record.duration_ms, record.canceled))
.map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?;


// Hacky trick used by `workflow_as_code`
if !queued_job.is_flow_step {
if _duration > 500
&& (queued_job.job_kind == JobKind::Script
|| queued_job.job_kind == JobKind::Preview)
{
if let Err(e) = sqlx::query!(
"UPDATE completed_job SET flow_status = q.flow_status FROM queue q WHERE completed_job.id = $1 AND q.id = $1 AND q.workspace_id = $2 AND completed_job.workspace_id = $2 AND q.flow_status IS NOT NULL",
&queued_job.id,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await {
tracing::error!("Could not update job duration: {}", e);
}
}
if let Some(parent_job) = queued_job.parent_job {
if let Err(e) = sqlx::query_scalar!(
"UPDATE queue SET flow_status = jsonb_set(jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], COALESCE(flow_status->$1, '{}'::jsonb)), array[$1, 'duration_ms'], to_jsonb($2::bigint)) WHERE id = $3 AND workspace_id = $4",
&queued_job.id.to_string(),
_duration,
parent_job,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await {
tracing::error!("Could not update parent job flow_status: {}", e);
}
let _ = sqlx::query_scalar!(
r#"
UPDATE queue SET flow_status = jsonb_set(
jsonb_set(
COALESCE(flow_status, '{}'::jsonb),
array[$1],
COALESCE(flow_status->$1, '{}'::jsonb)
),
array[$1, 'duration_ms'],
to_jsonb($2::bigint)
) WHERE id = $3 AND workspace_id = $4
"#,
&queued_job.id.to_string(),
_duration,
parent_job,
&queued_job.workspace_id
)
.execute(&mut *tx)
.await
.inspect_err(|e| tracing::error!("Could not update parent job flow_status: {}", e));
}
}
// tracing::error!("Added completed job {:#?}", queued_job);
Expand Down Expand Up @@ -733,7 +720,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
match err {
Error::QuotaExceeded(_) => (),
// scheduling next job failed and could not disable schedule => make zombie job to retry
_ => return Ok((Some(job_id), 0, true)),
_ => return Ok((Some(job_id), 0, canceled, true)),
}
};
}
Expand Down Expand Up @@ -844,7 +831,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
"inserted completed job: {} (success: {success})",
queued_job.id
);
Ok((None, _duration, _skip_downstream_error_handlers)) as windmill_common::error::Result<(Option<Uuid>, i64, bool)>
Ok((None, _duration, canceled, _skip_downstream_error_handlers)) as error::Result<(Option<Uuid>, i64, bool, bool)>
})
.retry(
ConstantBuilder::default()
Expand Down Expand Up @@ -976,13 +963,8 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
);
}

if let Err(err) = send_error_to_workspace_handler(
&queued_job,
canceled_by.is_some(),
db,
Json(&result),
)
.await
if let Err(err) =
send_error_to_workspace_handler(&queued_job, canceled, db, Json(&result)).await
{
match err {
Error::QuotaExceeded(_) => {}
Expand All @@ -1001,7 +983,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
}
}

if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && canceled_by.is_none() {
if !queued_job.is_flow_step && queued_job.job_kind == JobKind::Script && !canceled {
if let Some(hash) = queued_job.script_hash {
let p = sqlx::query_scalar!(
"SELECT restart_unless_cancelled FROM script WHERE hash = $1 AND workspace_id = $2",
Expand Down

0 comments on commit b2ebb6d

Please sign in to comment.