Skip to content

Commit

Permalink
Add a monitor to some detached Tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash authored and antoine-levitt committed May 9, 2021
1 parent ce27727 commit 4c056a1
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 52 deletions.
10 changes: 7 additions & 3 deletions doc/src/manual/asynchronous-programming.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and a read end :

# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
@async foo()
errormonitor(@async foo())
end
```
* Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects
Expand Down Expand Up @@ -263,10 +263,10 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@async do_work()
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # print out results
Expand All @@ -289,6 +289,10 @@ julia> @elapsed while n > 0 # print out results
0.029772311
```

Instead of `errormonitor(t)`, a more robust solution may be use use `bind(results, t)`, as that will
not only log any unexpected failures, but also force the associated resources to close and propagate
the exception everywhere.

## More task operations

Task operations are built on a low-level primitive called [`yieldto`](@ref).
Expand Down
2 changes: 1 addition & 1 deletion doc/src/manual/distributed-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
Expand Down
12 changes: 6 additions & 6 deletions doc/src/manual/networking-and-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ Let's first create a simple server:
```julia-repl
julia> using Sockets
julia> @async begin
julia> errormonitor(@async begin
server = listen(2000)
while true
sock = accept(server)
println("Hello World\n")
end
end
end)
Task (runnable) @0x00007fd31dc11ae0
```

Expand Down Expand Up @@ -265,23 +265,23 @@ printed the message and waited for the next client. Reading and writing works in
To see this, consider the following simple echo server:

```julia-repl
julia> @async begin
julia> errormonitor(@async begin
server = listen(2001)
while true
sock = accept(server)
@async while isopen(sock)
write(sock, readline(sock, keep=true))
end
end
end
end)
Task (runnable) @0x00007fd31dc12e60
julia> clientside = connect(2001)
TCPSocket(RawFD(28) open, 0 bytes waiting)
julia> @async while isopen(clientside)
julia> errormonitor(@async while isopen(clientside)
write(stdout, readline(clientside, keep=true))
end
end)
Task (runnable) @0x00007fd31dc11870
julia> println(clientside,"Hello World from the Echo Server")
Expand Down
2 changes: 2 additions & 0 deletions doc/src/manual/running-external-programs.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ wait(writer)
fetch(reader)
```

(commonly also, reader is not a separate task, since we immediately `fetch` it anyways).

### Complex Example

The combination of a high-level programming language, a first-class command abstraction, and automatic
Expand Down
12 changes: 7 additions & 5 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
@async exec_conn_func(w)
t = @async exec_conn_func(w)
else
# route request via node 1
@async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
end
end
Expand Down Expand Up @@ -242,10 +243,10 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
@async while isopen(sock)
errormonitor(@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end
end)
print(out, "julia_worker:") # print header
print(out, "$(string(LPROC.bind_port))#") # print port
print(out, LPROC.bind_addr)
Expand Down Expand Up @@ -274,7 +275,7 @@ end


function redirect_worker_output(ident, stream)
@async while !eof(stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand All @@ -284,6 +285,7 @@ function redirect_worker_output(ident, stream)
println(" From worker $(ident):\t$line")
end
end
errormonitor(t)
end

struct LaunchWorkerError <: Exception
Expand Down
3 changes: 2 additions & 1 deletion stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
@async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
end

function make_preduce_body(var, body)
Expand Down
19 changes: 6 additions & 13 deletions stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,15 @@ default_addprocs_params(::SSHManager) =
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
# Launch one worker on each unique host in parallel. Additional workers are launched later.
# Wait for all launches to complete.
launch_tasks = Vector{Any}(undef, length(manager.machines))

for (i, (machine, cnt)) in enumerate(manager.machines)
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @async try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
end
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
end
end
end

for t in launch_tasks
wait(t::Task)
end

notify(launch_ntfy)
end

Expand Down
26 changes: 12 additions & 14 deletions stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
@async run_work_thunk(rv, thunk)
errormonitor(@async run_work_thunk(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -111,7 +111,7 @@ end

## message event handlers ##
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
@async process_tcp_streams(r_stream, w_stream, incoming)
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Expand Down Expand Up @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake.
See also [`cluster_cookie`](@ref).
"""
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
@async message_handler_loop(r_stream, w_stream, incoming)
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
end

function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
Expand Down Expand Up @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
@async begin
errormonitor(@async begin
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
if isa(v, SyncTake)
try
Expand All @@ -285,18 +285,20 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
else
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
end
end
nothing
end)
end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
@async begin
errormonitor(@async begin
rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
end
nothing
end)
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)
errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true))
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down Expand Up @@ -330,8 +332,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
lazy = msg.lazy
PGRP.lazy = lazy

wait_tasks = Task[]
for (connect_at, rpid) in msg.other_workers
@sync for (connect_at, rpid) in msg.other_workers
wconfig = WorkerConfig()
wconfig.connect_at = connect_at

Expand All @@ -340,14 +341,11 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
else
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
@async connect_to_peer(cluster_manager, rpid, wconfig)
end
end
end

for wt in wait_tasks; Base.wait(wt); end

send_connection_hdr(controller, false)
send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
end
Expand Down
6 changes: 3 additions & 3 deletions stdlib/Distributed/src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ or to use a local [`Channel`](@ref) as a proxy:
```julia
p = 1
f = Future(p)
@async put!(f, remotecall_fetch(long_computation, p))
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
```
"""
Expand Down Expand Up @@ -249,10 +249,10 @@ end

const any_gc_flag = Condition()
function start_gc_msgs_task()
@async while true
errormonitor(@async while true
wait(any_gc_flag)
flush_gc_msgs()
end
end)
end

function send_del_client(rr)
Expand Down
6 changes: 2 additions & 4 deletions stdlib/REPL/src/LineEdit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ function beep(s::PromptState, duration::Real=options(s).beep_duration,
isinteractive() || return # some tests fail on some platforms
s.beeping = min(s.beeping + duration, maxduration)
let colors = Base.copymutable(colors)
@async begin
errormonitor(@async begin
trylock(s.refresh_lock) || return
try
orig_prefix = s.p.prompt_prefix
Expand All @@ -198,12 +198,10 @@ function beep(s::PromptState, duration::Real=options(s).beep_duration,
s.p.prompt_prefix = orig_prefix
refresh_multi_line(s, beeping=true)
s.beeping = 0.0
catch e
Base.showerror(stdout, e, catch_backtrace())
finally
unlock(s.refresh_lock)
end
end
end)
end
nothing
end
Expand Down
2 changes: 2 additions & 0 deletions stdlib/REPL/src/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,12 @@ function run_repl(repl::AbstractREPL, @nospecialize(consumer = x -> nothing); ba
end
if backend_on_current_task
t = @async run_frontend(repl, backend_ref)
errormonitor(t)
Base._wait2(t, cleanup)
start_repl_backend(backend, consumer)
else
t = @async start_repl_backend(backend, consumer)
errormonitor(t)
Base._wait2(t, cleanup)
run_frontend(repl, backend_ref)
end
Expand Down
5 changes: 3 additions & 2 deletions stdlib/REPL/test/repl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ for prompt = ["TestΠ", () -> randstring(rand(1:10))]
# In the future if we want we can add a test that the right object
# gets displayed by intercepting the display
repl.specialdisplay = REPL.REPLDisplay(repl)
@async write(devnull, stdout_read) # redirect stdout to devnull so we drain the output pipe

errormonitor(@async write(devnull, stdout_read)) # redirect stdout to devnull so we drain the output pipe

repl.interface = REPL.setup_interface(repl)
repl_mode = repl.interface.modes[1]
Expand Down Expand Up @@ -1252,7 +1253,7 @@ end
# AST transformations (softscope, Revise, OhMyREPL, etc.)
@testset "AST Transformation" begin
backend = REPL.REPLBackend()
@async REPL.start_repl_backend(backend)
errormonitor(@async REPL.start_repl_backend(backend))
put!(backend.repl_channel, (:(1+1), false))
reply = take!(backend.response_channel)
@test reply == Pair{Any, Bool}(2, false)
Expand Down

0 comments on commit 4c056a1

Please sign in to comment.