Skip to content

cluster manager fixes #30172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ function redirect_worker_output(ident, stream)
end
end

struct LaunchWorkerError <: Exception
msg::String
end

Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg)

# The default TCP transport relies on the worker listening on a free
# port available and printing its bind address and port.
Expand Down Expand Up @@ -272,7 +277,7 @@ function read_worker_host_port(io::IO)

conninfo = fetch(readtask)
if isempty(conninfo) && !isopen(io)
error("Unable to read host:port string from worker. Launch command exited with error?")
throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?"))
end

ntries -= 1
Expand All @@ -286,13 +291,13 @@ function read_worker_host_port(io::IO)
end
close(io)
if ntries > 0
error("Timed out waiting to read host:port string from worker.")
throw(LaunchWorkerError("Timed out waiting to read host:port string from worker."))
else
error("Unexpected output from worker launch command. Host:port string not found.")
throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found."))
end
finally
for line in leader
println("\tFrom failed worker startup:\t", line)
println("\tFrom worker startup:\t", line)
end
end
end
Expand Down Expand Up @@ -354,6 +359,34 @@ the package `ClusterManagers.jl`.
The number of seconds a newly launched worker waits for connection establishment from the
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
environment. Relevant only when using TCP/IP as transport.

To launch workers without blocking the REPL, or the containing function
if launching workers programmatically, execute `addprocs` in its own task.

# Examples

```
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
```

```
# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end
```

```
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
```
"""
function addprocs(manager::ClusterManager; kwargs...)
init_multi()
Expand Down Expand Up @@ -499,9 +532,13 @@ function create_worker(manager, wconfig)
local r_s, w_s
try
(r_s, w_s) = connect(manager, w.id, wconfig)
catch
deregister_worker(w.id)
rethrow()
catch ex
try
deregister_worker(w.id)
kill(manager, w.id, wconfig)
finally
rethrow(ex)
end
end

w = Worker(w.id, r_s, w_s, manager; config=wconfig)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ for (addp_testf, expected_errstr, env) in testruns
old_stdout = stdout
stdout_out, stdout_in = redirect_stdout()
stdout_txt = @async filter!(readlines(stdout_out)) do s
return !startswith(s, "\tFrom failed worker startup:\t")
return !startswith(s, "\tFrom worker startup:\t")
end
try
withenv(env...) do
Expand Down