Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* `hq job progress` stopped using periodic polling.
In the current version, the client passively waits for events.
It saves some resources as the server is not queried every second.
* You can now depend on tasks from an earlier submit when submitting a jobfile.

## v0.24.0

Expand Down
77 changes: 66 additions & 11 deletions crates/hyperqueue/src/client/commands/submit/jobfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ use crate::client::commands::submit::defs::{PinMode as PinModeDef, TaskConfigDef
use crate::client::globalsettings::GlobalSettings;
use crate::common::arraydef::IntArray;
use crate::common::utils::fs::get_current_dir;
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest,
TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
FromClientMessage, IdSelector, JobDescription, JobDetailRequest, JobSubmitDescription,
JobTaskDescription, PinMode, SubmitRequest, TaskDescription, TaskIdSelector, TaskKind,
TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
};
use clap::Parser;
use smallvec::smallvec;
use std::path::PathBuf;
use tako::Map;
use tako::gateway::{EntryType, ResourceRequest, ResourceRequestVariants, TaskDataFlags};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
use tako::{JobId, JobTaskCount, JobTaskId};
use tako::{Map, Set};

#[derive(Parser)]
pub struct JobSubmitFileOpts {
Expand Down Expand Up @@ -130,14 +132,18 @@ fn build_job_desc_individual_tasks(
tasks: Vec<TaskDef>,
data_flags: TaskDataFlags,
has_streaming: bool,
existing_tasks: &[JobTaskId],
) -> crate::Result<JobTaskDescription> {
let mut max_id: JobTaskId = tasks
.iter()
.map(|t| t.id)
.chain(existing_tasks.iter().copied().map(Some))
.max()
.flatten()
.unwrap_or(JobTaskId::new(0));

let existing_tasks: Set<JobTaskId> = existing_tasks.iter().copied().collect();

/* Topological sort */
let original_len = tasks.len();
let mut new_tasks = Vec::with_capacity(original_len);
Expand All @@ -146,22 +152,40 @@ fn build_job_desc_individual_tasks(
let mut consumers: Map<JobTaskId, Vec<_>> = Map::new();
for task in tasks {
let t = build_task(task, &mut max_id, data_flags, has_streaming);
if in_degrees.insert(t.id, t.task_deps.len()).is_some() {
if existing_tasks.contains(&t.id) {
return Err(crate::Error::GenericError(format!(
"Task {} has already been defined in this job",
t.id
)));
}

let task_deps_from_this_submit: Vec<JobTaskId> = t
.task_deps
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this could be

            .iter()
            .filter(|&task| !existing_tasks.contains(task))
            .copied()
            .collect();

But I think that pre-cloning might be better to avoid partial reallocations of the Vec, since the usual case is that all/most deps will be from the same submit.

.clone()
.into_iter()
.filter(|task| !existing_tasks.contains(task))
.collect();

if in_degrees
.insert(t.id, task_deps_from_this_submit.len())
.is_some()
{
return Err(crate::Error::GenericError(format!(
"Task {} is defined multiple times",
t.id
)));
}
let is_empty = t.task_deps.is_empty();
let is_empty = task_deps_from_this_submit.is_empty();
if is_empty {
new_tasks.push(t);
} else {
for dep in &t.task_deps {
for dep in &task_deps_from_this_submit {
consumers.entry(*dep).or_default().push(t.id);
}
unprocessed_tasks.insert(t.id, t);
}
}

let mut idx = 0;
while idx < new_tasks.len() {
if let Some(consumers) = consumers.get(&new_tasks[idx].id) {
Expand All @@ -182,23 +206,35 @@ fn build_job_desc_individual_tasks(
} else {
let t = unprocessed_tasks.values().next().unwrap();
return Err(crate::Error::GenericError(format!(
"Task {} is part of dependency cycle or has an invalid dependencies",
"Task {} is part of dependency cycle or has invalid dependencies",
t.id
)));
}

Ok(JobTaskDescription::Graph { tasks: new_tasks })
}

fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<SubmitRequest> {
fn build_job_submit(
jdef: JobDef,
job_info: Option<(JobId, Vec<JobTaskId>)>,
) -> crate::Result<SubmitRequest> {
let task_desc = if let Some(array) = jdef.array {
build_job_desc_array(array, jdef.stream.is_some())
} else {
let mut data_flags = TaskDataFlags::empty();
if jdef.data_layer {
data_flags.insert(TaskDataFlags::ENABLE_DATA_LAYER);
}
build_job_desc_individual_tasks(jdef.tasks, data_flags, jdef.stream.is_some())?
let existing_tasks = job_info
.as_ref()
.map(|(_, tasks)| tasks.as_slice())
.unwrap_or_default();
build_job_desc_individual_tasks(
jdef.tasks,
data_flags,
jdef.stream.is_some(),
existing_tasks,
)?
};
Ok(SubmitRequest {
job_desc: JobDescription {
Expand All @@ -210,7 +246,7 @@ fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<Submit
submit_dir: get_current_dir(),
stream_path: jdef.stream,
},
job_id,
job_id: job_info.map(|j| j.0),
})
}

Expand All @@ -225,6 +261,25 @@ pub async fn submit_computation_from_job_file(
anyhow::anyhow!(format!("Cannot read {}: {}", opts.path.display(), e))
})?)?
};
let request = build_job_submit(jdef, opts.job)?;

let job_info = if let Some(job_id) = opts.job {
let mut response =
rpc_call!(session.connection(), FromClientMessage::JobDetail(JobDetailRequest {
job_id_selector: IdSelector::Specific(IntArray::from_id(job_id.as_num())),
task_selector: Some(TaskSelector {
id_selector: TaskIdSelector::All,
status_selector: TaskStatusSelector::All
})
}), ToClientMessage::JobDetailResponse(r) => r)
.await?;
let Some(job) = response.details.pop().and_then(|(_, detail)| detail) else {
return Err(anyhow::anyhow!("Job {job_id} not found"));
};
Some((job_id, job.tasks.into_iter().map(|(id, _)| id).collect()))
} else {
None
};

let request = build_job_submit(jdef, job_info)?;
send_submit_request(gsettings, session, request, false, false, None).await
}
8 changes: 5 additions & 3 deletions docs/jobs/jobfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ command = ["sleep", "1"]

## Task dependencies

Job Definition File allows to define a dependencies between tasks. In other words,
Job Definition File allows to define dependencies between tasks. In other words,
it means that the task may be executed only if the previous tasks are already finished.

The task's option `deps` defines on which tasks the given task dependents.
The task is addressed by their IDs.
The task's option `deps` defines on which tasks the given task depends.
Each dependency is identified by a task ID.

The following example creates three tasks where the third task depends on the first two tasks.

Expand All @@ -143,6 +143,8 @@ command = [...]
deps = [1, 3] # <---- Dependency on tasks 1 and 3
```

Dependencies can also refer to tasks from [earlier submits](openjobs.md) in the same job.

## Resource variants

More resource configurations may be defined for a task.
Expand Down
40 changes: 40 additions & 0 deletions tests/test_jobfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,46 @@ def test_job_file_dependencies_with_cycle(hq_env: HqEnv, tmp_path):
hq_env.command(["job", "submit-file", "job.toml"], expect_fail="cycle")


def test_job_file_dependencies_from_previous_submit(hq_env: HqEnv, tmp_path):
hq_env.start_server()
tmp_path.joinpath("job1.toml").write_text(
"""
[[task]]
id = 1
command = ["sleep", "2"]
"""
)

hq_env.command(["job", "open"])
hq_env.command(["job", "submit-file", "--job", "1", "job1.toml"])

tmp_path.joinpath("job2.toml").write_text(
"""
[[task]]
id = 3
command = ["sleep", "1"]
deps = [1]

[[task]]
id = 5
command = ["sleep", "1"]
deps = [1, 3]
"""
)
hq_env.command(["job", "submit-file", "--job", "1", "job2.toml"])
hq_env.command(["job", "close", "1"])

table = hq_env.command(["task", "info", "1", "1"], as_table=True)
table.check_row_value("Dependencies", "")
table = hq_env.command(["task", "info", "1", "3"], as_table=True)
table.check_row_value("Dependencies", "1")
table = hq_env.command(["task", "info", "1", "5"], as_table=True)
table.check_row_value("Dependencies", "1,3")

hq_env.start_worker()
wait_for_job_state(hq_env, 1, "FINISHED")


def test_job_file_attach(hq_env: HqEnv, tmp_path):
hq_env.start_server()
hq_env.command(["job", "open"])
Expand Down
Loading