Skip to content

Commit de0ce58

Browse files
committed
chore: fix update_or_create_task_run correct on ScheduleTask
1 parent da12e5c commit de0ce58

File tree

2 files changed

+29
-15
lines changed

2 files changed

+29
-15
lines changed

src/meta/app/src/principal/task.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,15 @@ impl TaskMessage {
136136
0
137137
}
138138

139+
/// Returns the inclusive range of key prefixes used by `TaskMessage`.
140+
///
141+
/// This range can be used to scan all keys generated by `TaskMessage::key()`
142+
/// and related methods (e.g., `schedule_key`). The prefix `0` is prepended
143+
/// to all task-related keys to group them under the same prefix range,
144+
/// enabling efficient key scanning or iteration.
145+
///
146+
/// The returned range is (0, 1), which includes all keys starting with `0-`
147+
/// (as produced by `TaskMessage::prefix()`), and excludes any other unrelated prefixes.
139148
pub fn prefix_range() -> (i64, i64) {
140149
(0, 1)
141150
}

src/query/service/src/task/service.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,21 +234,24 @@ impl TaskService {
234234
.await
235235
};
236236

237-
task_service
238-
.update_or_create_task_run(&TaskRun {
239-
task: task.clone(),
240-
run_id: Self::make_run_id(),
241-
attempt_number: task.suspend_task_after_num_failures.unwrap_or(0)
242-
as i32,
243-
state: State::Scheduled,
244-
scheduled_at: Utc::now(),
245-
completed_at: None,
246-
error_code: 0,
247-
error_message: None,
248-
root_task_id: EMPTY_TASK_ID,
249-
})
250-
.await?;
251-
237+
let fn_new_task_run = async |task_service: &TaskService, task: &Task| {
238+
task_service
239+
.update_or_create_task_run(&TaskRun {
240+
task: task.clone(),
241+
run_id: Self::make_run_id(),
242+
attempt_number: task
243+
.suspend_task_after_num_failures
244+
.unwrap_or(0)
245+
as i32,
246+
state: State::Scheduled,
247+
scheduled_at: Utc::now(),
248+
completed_at: None,
249+
error_code: 0,
250+
error_message: None,
251+
root_task_id: EMPTY_TASK_ID,
252+
})
253+
.await
254+
};
252255
match schedule_options.schedule_type {
253256
ScheduleType::IntervalType => {
254257
let task_mgr = task_mgr.clone();
@@ -269,6 +272,7 @@ impl TaskService {
269272
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
270273
continue;
271274
};
275+
fn_new_task_run(&task_service, &task).await?;
272276
task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?;
273277
}
274278
_ = child_token.cancelled() => {
@@ -312,6 +316,7 @@ impl TaskService {
312316
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
313317
continue;
314318
};
319+
fn_new_task_run(&task_service, &task).await?;
315320
task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?;
316321
}
317322
_ = child_token.cancelled() => {

0 commit comments

Comments
 (0)