Skip to content

Commit

Permalink
Fix #2
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Apr 26, 2023
1 parent 10cf56a commit 7c01cc8
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 77 deletions.
2 changes: 0 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ Depends:
R (>= 4.0.0)
Imports:
crew (>= 0.1.0),
processx,
R6,
rlang,
utils
Suggests:
knitr (>= 1.30),
Expand Down
2 changes: 0 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ importFrom(crew,crew_assert)
importFrom(crew,crew_class_launcher)
importFrom(crew,crew_launcher)
importFrom(crew,crew_random_name)
importFrom(processx,run)
importFrom(rlang,abort)
importFrom(utils,globalVariables)
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# crew.cluster 0.0.2.9000 (development)


* Submit workers asynchronously (#2).
* Use `system2()` instead of `processx` to submit workers (#2).
* Add a `verbose` argument to the SGE launcher to optionally print `system2()` stdout and stderr.

# crew.cluster 0.0.2

Expand Down
4 changes: 3 additions & 1 deletion R/crew_controller_sge.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ crew_controller_sge <- function(
sge_cores = NULL,
sge_gpu = NULL,
sge_lines = NULL,
verbose = FALSE,
auto_scale = "demand"
) {
router <- crew::crew_router(
Expand Down Expand Up @@ -77,7 +78,8 @@ crew_controller_sge <- function(
sge_memory_gigabytes_limit = sge_memory_gigabytes_limit,
sge_cores = sge_cores,
sge_gpu = sge_gpu,
sge_lines = sge_lines
sge_lines = sge_lines,
verbose = verbose
)
controller <- crew::crew_controller(
router = router,
Expand Down
116 changes: 67 additions & 49 deletions R/crew_launcher_sge.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
#' added to the SGE job script just after the more common flags.
#' An example would be `sge_lines = "module load R"` if your SGE cluster
#' supports R through an environment module.
#' @param verbose Logical, whether to see console output and error messages
#' when submitting worker.
crew_launcher_sge <- function(
name = NULL,
seconds_launch = 60,
Expand All @@ -81,7 +83,8 @@ crew_launcher_sge <- function(
sge_memory_gigabytes_limit = NULL,
sge_cores = NULL,
sge_gpu = NULL,
sge_lines = NULL
sge_lines = NULL,
verbose = FALSE
) {
name <- as.character(name %|||% crew::crew_random_name())
launcher <- crew_class_launcher_sge$new(
Expand All @@ -108,7 +111,8 @@ crew_launcher_sge <- function(
sge_memory_gigabytes_limit = sge_memory_gigabytes_limit,
sge_cores = sge_cores,
sge_gpu = sge_gpu,
sge_lines = sge_lines
sge_lines = sge_lines,
verbose = verbose
)
launcher$validate()
launcher
Expand Down Expand Up @@ -146,6 +150,8 @@ crew_class_launcher_sge <- R6::R6Class(
sge_gpu = NULL,
#' @field sge_lines See [crew_launcher_sge()].
sge_lines = NULL,
#' @field verbose See [crew_launcher_sge()].
verbose = NULL,
#' @description SGE launcher constructor.
#' @return an SGE launcher object.
#' @param name See [crew_launcher_sge()].
Expand All @@ -172,6 +178,7 @@ crew_class_launcher_sge <- R6::R6Class(
#' @param sge_cores See [crew_launcher_sge()].
#' @param sge_gpu See [crew_launcher_sge()].
#' @param sge_lines See [crew_launcher_sge()].
#' @param verbose See [crew_launcher_sge()].
initialize = function(
name = NULL,
seconds_launch = NULL,
Expand All @@ -196,7 +203,8 @@ crew_class_launcher_sge <- R6::R6Class(
sge_memory_gigabytes_limit = NULL,
sge_cores = NULL,
sge_gpu = NULL,
sge_lines = NULL
sge_lines = NULL,
verbose = NULL
) {
super$initialize(
name = name,
Expand Down Expand Up @@ -224,51 +232,7 @@ crew_class_launcher_sge <- R6::R6Class(
self$sge_cores <- sge_cores
self$sge_gpu <- sge_gpu
self$sge_lines <- sge_lines
},
#' @description Launch a local process worker which will
#' dial into a socket.
#' @details The `call` argument is R code that will run to
#' initiate the worker. Together, the `launcher`, `worker`,
#' and `instance` arguments are useful for
#' constructing informative job names.
#' @return A handle object to allow the termination of the worker
#' later on.
#' @param call Text string with a namespaced call to [crew_worker()]
#' which will run in the worker and accept tasks.
#' @param launcher Character of length 1, name of the launcher.
#' @param worker Positive integer of length 1, index of the worker.
#' This worker index remains the same even when the current instance
#' of the worker exits and a new instance launches.
#' It is always between 1 and the maximum number of concurrent workers.
#' @param instance Character of length 1 to uniquely identify
#' the current instance of the worker.
launch_worker = function(call, launcher, worker, instance) {
name <- name_job(
launcher = launcher,
worker = worker,
instance = instance
)
lines <- c(
paste("#$ -N", name),
self$script(),
paste("R -e", shQuote(call))
)
script <- tempfile()
writeLines(text = lines, con = script)
on.exit(unlink(script))
processx::run(command = self$sge_qsub, args = script)
name
},
#' @description Terminate a local process worker.
#' @return `NULL` (invisibly).
#' @param handle A process handle object previously
#' returned by `launch_worker()`.
terminate_worker = function(handle) {
processx::run(
command = self$sge_qdel,
args = handle,
error_on_status = FALSE
)
self$verbose <- verbose
},
#' @description Validate the launcher.
#' @return `NULL` (invisibly). Throws an error if a field is invalid.
Expand Down Expand Up @@ -303,7 +267,8 @@ crew_class_launcher_sge <- R6::R6Class(
fields <- c(
"sge_cwd",
"sge_envvars",
"sge_log_join"
"sge_log_join",
"verbose"
)
for (field in fields) {
crew::crew_assert(
Expand Down Expand Up @@ -332,6 +297,59 @@ crew_class_launcher_sge <- R6::R6Class(
}
invisible()
},
#' @description Launch a local process worker which will
#' dial into a socket.
#' @details The `call` argument is R code that will run to
#' initiate the worker. Together, the `launcher`, `worker`,
#' and `instance` arguments are useful for
#' constructing informative job names.
#' @return A handle object to allow the termination of the worker
#' later on.
#' @param call Text string with a namespaced call to [crew_worker()]
#' which will run in the worker and accept tasks.
#' @param launcher Character of length 1, name of the launcher.
#' @param worker Positive integer of length 1, index of the worker.
#' This worker index remains the same even when the current instance
#' of the worker exits and a new instance launches.
#' It is always between 1 and the maximum number of concurrent workers.
#' @param instance Character of length 1 to uniquely identify
#' the current instance of the worker.
launch_worker = function(call, launcher, worker, instance) {
name <- name_job(
launcher = launcher,
worker = worker,
instance = instance
)
lines <- c(
paste("#$ -N", name),
self$script(),
paste("R -e", shQuote(call))
)
script <- name_script(name = name)
writeLines(text = lines, con = script)
system2(
command = self$sge_qsub,
args = shQuote(script),
stdout = if_any(self$verbose, "", FALSE),
stderr = if_any(self$verbose, "", FALSE),
wait = FALSE
)
name
},
#' @description Terminate a local process worker.
#' @return `NULL` (invisibly).
#' @param handle A process handle object previously
#' returned by `launch_worker()`.
terminate_worker = function(handle) {
unlink(name_script(name = handle), force = TRUE)
system2(
command = self$sge_qdel,
args = shQuote(handle),
stdout = if_any(self$verbose, "", FALSE),
stderr = if_any(self$verbose, "", FALSE),
wait = FALSE
)
},
#' @description Generate the job script.
#' @details Includes everything except the worker-instance-specific
#' job name and the worker-instance-specific
Expand Down
2 changes: 0 additions & 2 deletions R/crew_package.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
#' @family help
#' @importFrom crew crew_assert crew_class_launcher crew_launcher
#' crew_random_name
#' @importFrom processx run
#' @importFrom R6 R6Class
#' @importFrom rlang abort
#' @importFrom utils globalVariables
NULL

Expand Down
6 changes: 5 additions & 1 deletion R/utils_names.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ name_job <- function(launcher, worker, instance) {
out <- paste(launcher, worker, instance, sep = "-")
alpha <- all(grepl(pattern = "^[[:alpha:]]", x = out))
if (!alpha) {
out <- paste0("sge-", out)
out <- paste0("worker-", out)
}
out
}

name_script <- function(name) {
file.path(tempdir(), paste0(name, ".sh"))
}
2 changes: 1 addition & 1 deletion inst/WORDLIST
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ CloudWatch
MERCHANTABILITY
NONINFRINGEMENT
Tidyselect
Bischel
Bischl
Surmann
Kubernetes
namespaced
Expand Down
35 changes: 20 additions & 15 deletions man/crew_class_launcher_sge.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions man/crew_controller_sge.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion man/crew_launcher_sge.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tests/sge/minimal.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ controller <- crew_controller_sge(
name = "my_workflow",
workers = 1L,
seconds_idle = 300,
sge_lines = paste0("module load R/", getRversion())
sge_lines = paste0("module load R/", getRversion()),
verbose = TRUE
)
controller$start()
controller$push(
Expand Down
8 changes: 7 additions & 1 deletion tests/testthat/test-utils_names.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
test_that("name_job()", {
expect_equal(name_job("a", "b", "c"), "a-b-c")
expect_equal(name_job("1", "b", "c"), "sge-1-b-c")
expect_equal(name_job("1", "b", "c"), "worker-1-b-c")
})

test_that("name_script()", {
name <- name_job("a", "b", "c")
script <- name_script(name)
expect_equal(script, file.path(tempdir(), "a-b-c.sh"))
})

0 comments on commit 7c01cc8

Please sign in to comment.