Skip to content

Commit efb657c

Browse files
committed
Allow depending on tasks from an earlier submit in the jobfile
1 parent 34196df commit efb657c

File tree

4 files changed

+119
-15
lines changed

4 files changed

+119
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* `hq job progress` stopped using periodic polling.
2727
In the current version, the client passively waits for events.
2828
It saves some resources as the server is not queried every second.
29+
* You can now depend on tasks from an earlier submit when submitting a jobfile.
2930

3031
## v0.24.0
3132

crates/hyperqueue/src/client/commands/submit/jobfile.rs

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ use crate::client::commands::submit::defs::{PinMode as PinModeDef, TaskConfigDef
88
use crate::client::globalsettings::GlobalSettings;
99
use crate::common::arraydef::IntArray;
1010
use crate::common::utils::fs::get_current_dir;
11+
use crate::rpc_call;
1112
use crate::transfer::connection::ClientSession;
1213
use crate::transfer::messages::{
13-
JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest,
14-
TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
14+
FromClientMessage, IdSelector, JobDescription, JobDetailRequest, JobSubmitDescription,
15+
JobTaskDescription, PinMode, SubmitRequest, TaskDescription, TaskIdSelector, TaskKind,
16+
TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
1517
};
1618
use clap::Parser;
1719
use smallvec::smallvec;
1820
use std::path::PathBuf;
19-
use tako::Map;
2021
use tako::gateway::{EntryType, ResourceRequest, ResourceRequestVariants, TaskDataFlags};
2122
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
2223
use tako::{JobId, JobTaskCount, JobTaskId};
24+
use tako::{Map, Set};
2325

2426
#[derive(Parser)]
2527
pub struct JobSubmitFileOpts {
@@ -130,22 +132,33 @@ fn build_job_desc_individual_tasks(
130132
tasks: Vec<TaskDef>,
131133
data_flags: TaskDataFlags,
132134
has_streaming: bool,
135+
existing_tasks: &[JobTaskId],
133136
) -> crate::Result<JobTaskDescription> {
134137
let mut max_id: JobTaskId = tasks
135138
.iter()
136139
.map(|t| t.id)
140+
.chain(existing_tasks.iter().copied().map(Some))
137141
.max()
138142
.flatten()
139143
.unwrap_or(JobTaskId::new(0));
140144

145+
let existing_tasks: Set<JobTaskId> = existing_tasks.iter().copied().collect();
146+
141147
/* Topological sort */
142148
let original_len = tasks.len();
143149
let mut new_tasks = Vec::with_capacity(original_len);
144150
let mut unprocessed_tasks = Map::new();
145151
let mut in_degrees = Map::new();
146152
let mut consumers: Map<JobTaskId, Vec<_>> = Map::new();
153+
let mut deps_from_previous_submit: Map<JobTaskId, Vec<_>> = Map::new();
147154
for task in tasks {
148155
let t = build_task(task, &mut max_id, data_flags, has_streaming);
156+
if existing_tasks.contains(&t.id) {
157+
return Err(crate::Error::GenericError(format!(
158+
"Task {} has already been defined in this job",
159+
t.id
160+
)));
161+
}
149162
if in_degrees.insert(t.id, t.task_deps.len()).is_some() {
150163
return Err(crate::Error::GenericError(format!(
151164
"Task {} is defined multiple times",
@@ -157,14 +170,21 @@ fn build_job_desc_individual_tasks(
157170
new_tasks.push(t);
158171
} else {
159172
for dep in &t.task_deps {
160-
consumers.entry(*dep).or_default().push(t.id);
173+
if existing_tasks.contains(dep) {
174+
deps_from_previous_submit
175+
.entry(*dep)
176+
.or_default()
177+
.push(t.id);
178+
} else {
179+
consumers.entry(*dep).or_default().push(t.id);
180+
}
161181
}
162182
unprocessed_tasks.insert(t.id, t);
163183
}
164184
}
165-
let mut idx = 0;
166-
while idx < new_tasks.len() {
167-
if let Some(consumers) = consumers.get(&new_tasks[idx].id) {
185+
186+
let mut handle_consumers =
187+
|consumers: &[JobTaskId], new_tasks: &mut Vec<TaskWithDependencies>| {
168188
for c in consumers {
169189
let d = in_degrees.get_mut(c).unwrap();
170190
assert!(*d > 0);
@@ -173,6 +193,16 @@ fn build_job_desc_individual_tasks(
173193
new_tasks.push(unprocessed_tasks.remove(c).unwrap())
174194
}
175195
}
196+
};
197+
198+
for consumers in deps_from_previous_submit.values() {
199+
handle_consumers(consumers, &mut new_tasks);
200+
}
201+
202+
let mut idx = 0;
203+
while idx < new_tasks.len() {
204+
if let Some(consumers) = consumers.get(&new_tasks[idx].id) {
205+
handle_consumers(consumers, &mut new_tasks);
176206
}
177207
idx += 1;
178208
}
@@ -182,23 +212,35 @@ fn build_job_desc_individual_tasks(
182212
} else {
183213
let t = unprocessed_tasks.values().next().unwrap();
184214
return Err(crate::Error::GenericError(format!(
185-
"Task {} is part of dependency cycle or has an invalid dependencies",
215+
"Task {} is part of dependency cycle or has invalid dependencies",
186216
t.id
187217
)));
188218
}
189219

190220
Ok(JobTaskDescription::Graph { tasks: new_tasks })
191221
}
192222

193-
fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<SubmitRequest> {
223+
fn build_job_submit(
224+
jdef: JobDef,
225+
job_info: Option<(JobId, Vec<JobTaskId>)>,
226+
) -> crate::Result<SubmitRequest> {
194227
let task_desc = if let Some(array) = jdef.array {
195228
build_job_desc_array(array, jdef.stream.is_some())
196229
} else {
197230
let mut data_flags = TaskDataFlags::empty();
198231
if jdef.data_layer {
199232
data_flags.insert(TaskDataFlags::ENABLE_DATA_LAYER);
200233
}
201-
build_job_desc_individual_tasks(jdef.tasks, data_flags, jdef.stream.is_some())?
234+
let existing_tasks = job_info
235+
.as_ref()
236+
.map(|(_, tasks)| tasks.as_slice())
237+
.unwrap_or_default();
238+
build_job_desc_individual_tasks(
239+
jdef.tasks,
240+
data_flags,
241+
jdef.stream.is_some(),
242+
existing_tasks,
243+
)?
202244
};
203245
Ok(SubmitRequest {
204246
job_desc: JobDescription {
@@ -210,7 +252,7 @@ fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<Submit
210252
submit_dir: get_current_dir(),
211253
stream_path: jdef.stream,
212254
},
213-
job_id,
255+
job_id: job_info.map(|j| j.0),
214256
})
215257
}
216258

@@ -225,6 +267,25 @@ pub async fn submit_computation_from_job_file(
225267
anyhow::anyhow!(format!("Cannot read {}: {}", opts.path.display(), e))
226268
})?)?
227269
};
228-
let request = build_job_submit(jdef, opts.job)?;
270+
271+
let job_info = if let Some(job_id) = opts.job {
272+
let mut response =
273+
rpc_call!(session.connection(), FromClientMessage::JobDetail(JobDetailRequest {
274+
job_id_selector: IdSelector::Specific(IntArray::from_id(job_id.as_num())),
275+
task_selector: Some(TaskSelector {
276+
id_selector: TaskIdSelector::All,
277+
status_selector: TaskStatusSelector::All
278+
})
279+
}), ToClientMessage::JobDetailResponse(r) => r)
280+
.await?;
281+
let Some(job) = response.details.pop().and_then(|(_, detail)| detail) else {
282+
return Err(anyhow::anyhow!("Job {job_id} not found"));
283+
};
284+
Some((job_id, job.tasks.into_iter().map(|(id, _)| id).collect()))
285+
} else {
286+
None
287+
};
288+
289+
let request = build_job_submit(jdef, job_info)?;
229290
send_submit_request(gsettings, session, request, false, false, None).await
230291
}

docs/jobs/jobfile.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ command = ["sleep", "1"]
120120

121121
## Task dependencies
122122

123-
Job Definition File allows to define a dependencies between tasks. In other words,
123+
Job Definition File allows to define dependencies between tasks. In other words,
124124
it means that the task may be executed only if the previous tasks are already finished.
125125

126-
The task's option `deps` defines on which tasks the given task dependents.
127-
The task is addressed by their IDs.
126+
The task's option `deps` defines on which tasks the given task depends.
127+
Each dependency is identified by a task ID.
128128

129129
The following example creates three tasks where the third task depends on the first two tasks.
130130

@@ -143,6 +143,8 @@ command = [...]
143143
deps = [1, 3] # <---- Dependency on tasks 1 and 3
144144
```
145145

146+
Dependencies can also refer to tasks from [earlier submits](openjobs.md) in the same job.
147+
146148
## Resource variants
147149

148150
More resource configurations may be defined for a task.

tests/test_jobfile.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,46 @@ def test_job_file_dependencies_with_cycle(hq_env: HqEnv, tmp_path):
391391
hq_env.command(["job", "submit-file", "job.toml"], expect_fail="cycle")
392392

393393

394+
def test_job_file_dependencies_from_previous_submit(hq_env: HqEnv, tmp_path):
395+
hq_env.start_server()
396+
tmp_path.joinpath("job1.toml").write_text(
397+
"""
398+
[[task]]
399+
id = 1
400+
command = ["sleep", "2"]
401+
"""
402+
)
403+
404+
hq_env.command(["job", "open"])
405+
hq_env.command(["job", "submit-file", "--job", "1", "job1.toml"])
406+
407+
tmp_path.joinpath("job2.toml").write_text(
408+
"""
409+
[[task]]
410+
id = 3
411+
command = ["sleep", "1"]
412+
deps = [1]
413+
414+
[[task]]
415+
id = 5
416+
command = ["sleep", "1"]
417+
deps = [1, 3]
418+
"""
419+
)
420+
hq_env.command(["job", "submit-file", "--job", "1", "job2.toml"])
421+
hq_env.command(["job", "close", "1"])
422+
423+
table = hq_env.command(["task", "info", "1", "1"], as_table=True)
424+
table.check_row_value("Dependencies", "")
425+
table = hq_env.command(["task", "info", "1", "3"], as_table=True)
426+
table.check_row_value("Dependencies", "1")
427+
table = hq_env.command(["task", "info", "1", "5"], as_table=True)
428+
table.check_row_value("Dependencies", "1,3")
429+
430+
hq_env.start_worker()
431+
wait_for_job_state(hq_env, 1, "FINISHED")
432+
433+
394434
def test_job_file_attach(hq_env: HqEnv, tmp_path):
395435
hq_env.start_server()
396436
hq_env.command(["job", "open"])

0 commit comments

Comments
 (0)