-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcluster.R
125 lines (115 loc) · 4.87 KB
/
cluster.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
### Submit/wait for jobs in batch clusters.
sge.job.finished <- function(jobid)
system (paste0("qstat -j ", jobid), ignore.stdout = TRUE, ignore.stderr = TRUE,
intern = FALSE, wait = TRUE)
pbs.job.finished <- function(jobid)
system (paste0("qstat ", jobid), ignore.stdout = TRUE, ignore.stderr = TRUE,
intern = FALSE, wait = TRUE)
torque.job.finished <- function(jobid)
{
output <- suppressWarnings(system2("qstat", jobid, stdout = TRUE, stderr = TRUE))
## 1. If the return code of qstat in not 0, then no such job is in the queue
## anymore. That means that the job was (assumption) successfully submitted,
## executed and completed. Then, for some time the job id was marked as
## completed in the jobs queue and the job status removed from the queue list
## after some time. No job completed status of successfully submitted and
## completed jobs in the queue list happens if a small task was executed
## first and a task of some hours execution time finishes much later. The job
## status of the small task will not be in the queue anymore after some
## minutes. So: If no job id is in the queue list anymore and torq's qstat
## returns an error (return code > 0), then the job has been successfully
## executed (if it has been started successfully before).
if (!is.null(attr(output, "status"))) return(TRUE)
# 2. If qstat returns OK (return code ==0), then one has to parse qstat's
# output. If the 5th token in the last line is a 'C', then the job has
# terminated and its output files can be processed. Otherwise the job is not
# completed (queued, running, exiting...)
any(grepl(paste0(jobid, ".*\\sC\\s"), output))
}
slurm.job.finished <- function(jobid)
{
output <- suppressWarnings(system2("squeue", c("-j", jobid, "--noheader"),
stdout = TRUE, stderr = TRUE))
# If the above returns non-zero, either the job terminated or it never
# existed.
if (!is.null(attr(output, "status"))) return(TRUE)
# If may return zero, but the job is not in the system anymore because it
# completed. This is different from the Torque case.
!any(grepl(paste0("\\s", jobid, "\\s"), output))
}
htcondor.job.finished <- function(jobid)
{
output <- suppressWarnings(system2("condor_q", jobid, stdout = TRUE, stderr = TRUE))
# Check if job is still in the queue, otherwise it is considered finished
!any(grepl(paste0("ID:\\s", jobid), output))
}
## Launch a job with qsub and return its jobID. This function does not
## call qsub directly, but instead targetRunner should be a script that
## invokes qsub and returns a jobID.
target_runner_qsub <- function(experiment, scenario)
{
debugLevel <- scenario$debugLevel
res <- run_target_runner(experiment, scenario)
cmd <- res$cmd
output <- res$output
args <- res$args
jobID <- NULL
outputRaw <- output$output
err.msg <- output$error
if (is.null(err.msg)) {
# We cannot use parse.output because that tries to convert to numeric.
if (debugLevel >= 2) { cat (outputRaw, sep = "\n") }
# Initialize output as raw. If it is empty stays like this.
# strsplit crashes if outputRaw == character(0)
if (length(outputRaw) > 0) {
jobID <- strsplit(trim(outputRaw), "[[:space:]]+")[[1]]
}
if (length(jobID) != 1) {
err.msg <- paste0("The output of targetRunner should be only the jobID!")
jobID <- NULL
}
}
list(jobID = jobID, error = err.msg, outputRaw = outputRaw, call = paste(cmd, args))
}
cluster_lapply <- function(X, scenario, poll.time = 2)
{
debugLevel <- scenario$debugLevel
cluster.job.finished <-
switch(scenario$batchmode,
sge = sge.job.finished,
pbs = pbs.job.finished,
torque = torque.job.finished,
slurm = slurm.job.finished,
htcondor = htcondor.job.finished,
irace_error ("Invalid value of scenario$batchmode = ", scenario$batchmode))
# Parallel controls how many jobs we send at once. Some clusters have low
# limits.
## FIXME: It would be better to submit up to the limit, then one by one as jobs finish.
chunksize <- scenario$parallel
if (chunksize < 0L) {
chunksize <- length(X)
}
chunks <- split(X, ceiling(seq_along(X) / chunksize))
for (chunk in chunks) {
if (debugLevel >= 1) {
irace_note ("Sending ", length(chunk), " / ", length(X), " jobs\n")
}
output <- lapply(chunk, exec_target_runner, scenario = scenario, target_runner = target_runner_qsub)
jobIDs <- sapply(output, "[[", "jobID")
## Wait for cluster jobs to finish.
if (length(jobIDs) > 0L && debugLevel >= 1L) {
irace_note("Waiting for jobs ('.' == ", poll.time, " s) ")
}
for (jobID in jobIDs) {
while (!cluster.job.finished(jobID)) {
if (debugLevel >= 1) { cat(".") }
Sys.sleep(poll.time)
}
if (debugLevel >= 1) {
cat("\n")
irace_note ("DONE (", jobID, ")\n")
}
}
}
output
}