Skip to content

Commit 460f465

Browse files
bjarthurstaticfloat
authored andcommitted
cluster manager fixes (#30172)
* kill workers which don't launch properly * don't emit spurious error messages * document how to asynchronously launch workers
1 parent 5fd4d1c commit 460f465

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

stdlib/Distributed/src/cluster.jl

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ function redirect_worker_output(ident, stream)
241241
end
242242
end
243243

244+
struct LaunchWorkerError <: Exception
245+
msg::String
246+
end
247+
248+
Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg)
244249

245250
# The default TCP transport relies on the worker listening on a free
246251
# port available and printing its bind address and port.
@@ -272,7 +277,7 @@ function read_worker_host_port(io::IO)
272277

273278
conninfo = fetch(readtask)
274279
if isempty(conninfo) && !isopen(io)
275-
error("Unable to read host:port string from worker. Launch command exited with error?")
280+
throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?"))
276281
end
277282

278283
ntries -= 1
@@ -286,13 +291,13 @@ function read_worker_host_port(io::IO)
286291
end
287292
close(io)
288293
if ntries > 0
289-
error("Timed out waiting to read host:port string from worker.")
294+
throw(LaunchWorkerError("Timed out waiting to read host:port string from worker."))
290295
else
291-
error("Unexpected output from worker launch command. Host:port string not found.")
296+
throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found."))
292297
end
293298
finally
294299
for line in leader
295-
println("\tFrom failed worker startup:\t", line)
300+
println("\tFrom worker startup:\t", line)
296301
end
297302
end
298303
end
@@ -354,6 +359,34 @@ the package `ClusterManagers.jl`.
354359
The number of seconds a newly launched worker waits for connection establishment from the
355360
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
356361
environment. Relevant only when using TCP/IP as transport.
362+
363+
To launch workers without blocking the REPL, or the containing function
364+
if launching workers programmatically, execute `addprocs` in its own task.
365+
366+
# Examples
367+
368+
```
369+
# On busy clusters, call `addprocs` asynchronously
370+
t = @async addprocs(...)
371+
```
372+
373+
```
374+
# Utilize workers as and when they come online
375+
if nprocs() > 1 # Ensure at least one new worker is available
376+
.... # perform distributed execution
377+
end
378+
```
379+
380+
```
381+
# Retrieve newly launched worker IDs, or any error messages
382+
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
383+
if nworkers() == N
384+
new_pids = fetch(t)
385+
else
386+
fetch(t)
387+
end
388+
end
389+
```
357390
"""
358391
function addprocs(manager::ClusterManager; kwargs...)
359392
init_multi()
@@ -499,9 +532,13 @@ function create_worker(manager, wconfig)
499532
local r_s, w_s
500533
try
501534
(r_s, w_s) = connect(manager, w.id, wconfig)
502-
catch
503-
deregister_worker(w.id)
504-
rethrow()
535+
catch ex
536+
try
537+
deregister_worker(w.id)
538+
kill(manager, w.id, wconfig)
539+
finally
540+
rethrow(ex)
541+
end
505542
end
506543

507544
w = Worker(w.id, r_s, w_s, manager; config=wconfig)

stdlib/Distributed/test/distributed_exec.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,7 @@ for (addp_testf, expected_errstr, env) in testruns
11491149
old_stdout = stdout
11501150
stdout_out, stdout_in = redirect_stdout()
11511151
stdout_txt = @async filter!(readlines(stdout_out)) do s
1152-
return !startswith(s, "\tFrom failed worker startup:\t")
1152+
return !startswith(s, "\tFrom worker startup:\t")
11531153
end
11541154
try
11551155
withenv(env...) do

0 commit comments

Comments
 (0)