Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 15 additions & 73 deletions src/WorkerUtilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -355,40 +355,12 @@ end
return
end

# mostly copied from Base definitions in task.jl and threadingconstructs.jl
# but adapted to wrap interpolated mutable arguments in WeakRef
function _lift_one_interp!(e)
letargs = Any[] # store the new gensymed arguments
unwrapargs = Any[]
_lift_one_interp_helper(e, false, letargs, unwrapargs) # Start out _not_ in a quote context (false)
letargs, unwrapargs
end
_lift_one_interp_helper(v, _, _, _) = v
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs, unwrapargs)
if expr.head === :$
if in_quote_context # This $ is simply interpolating out of the quote
# Now, we're out of the quote, so any _further_ $ is ours.
in_quote_context = false
else
letarg = gensym()
newarg = gensym()
push!(letargs, :($(esc(letarg)) = ismutable($(esc(expr.args[1]))) ? WeakRef($(esc(expr.args[1]))) : $(esc(expr.args[1]))))
push!(unwrapargs, :($newarg = $unwrap($letarg)))
return newarg # Don't recurse into the lifted $() exprs
end
elseif expr.head === :quote
in_quote_context = true # Don't try to lift $ directly out of quotes
elseif expr.head === :macrocall
return expr # Don't recur into macro calls, since some other macros use $
end
for (i,e) in enumerate(expr.args)
expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs, unwrapargs)
end
expr
function clear_current_task()
current_task().storage = nothing
current_task().code = nothing
return
end

unwrap(@nospecialize(val)) = val isa WeakRef ? val.value : val

"""
@wkspawn [:default|:interactive] expr

Expand All @@ -407,48 +379,18 @@ references to these arguments won't prevent them from being garbage collected
once the `Task` has finished running.
"""
macro wkspawn(args...)
tpid = Int8(0)
na = length(args)
if na == 2
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
elseif ttype isa Symbol
# TODO: allow unquoted symbols
ttype = nothing
end
if ttype === :interactive
tpid = Int8(1)
elseif ttype !== :default
throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
end
elseif na == 1
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @spawn"))
end

letargs, unwrapargs = _lift_one_interp!(ex)
_ex = quote
$(unwrapargs...)
$ex
end
thunk = esc(:(()->($_ex)))
var = esc(Base.sync_varname)
quote
let $(letargs...)
local task = Task($thunk)
task.sticky = false
@static if isdefined(Base.Threads, :maxthreadid)
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
end
if $(Expr(:islocal, var))
put!($var, task)
end
schedule(task)
task
end
e = args[end]
expr = quote
ret = $e
$(clear_current_task)()
ret
end
@static if isdefined(Base.Threads, :maxthreadid)
q = esc(:(Threads.@spawn $(args[1:end-1]...) $expr))
else
q = esc(:(Threads.@spawn $expr))
end
return q
end

end # module
84 changes: 43 additions & 41 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -208,46 +208,48 @@ using Test, WorkerUtilities
@test !islocked(rw)
end

end # @testset "WorkerUtilities"

# running these in a @testset doesn't work for some reason??
# @testset "@wkspawn" begin
# basics
@test fetch(@wkspawn(1 + 1)) == 2

if isdefined(Base.Threads, :maxthreadid)
# interactive threadpool
@test fetch(@wkspawn(:interactive, 1 + 1)) == 2
end

# show incorrect behavior
ref = Ref(10)
ansref = Ref(0)
wkref = WeakRef(ref)
t = let ref=ref
Threads.@spawn begin
ansref[] = $ref[]
end
end
wait(t)
@test ansref[] == 10
t = nothing; ref = nothing; GC.gc(true); GC.gc(true); GC.gc(true)
# there should be no program references to ref, and 3 GC calls
# should have collected it, but it's still alive
@test wkref.value.x == 10

# and now with @wkspawn
ref = Ref(10)
ansref = Ref(0)
wkref = WeakRef(ref)
t = let ref=ref
@wkspawn begin
ansref[] = $ref[]
end
end
wait(t)
@test ansref[] == 10
t = nothing; ref = nothing; GC.gc(true); GC.gc(true); GC.gc(true)
@show wkref
# correctly GCed
@test wkref.value === nothing
# @testset "@wkspawn" begin
# basics
@test fetch(@wkspawn(1 + 1)) == 2

if isdefined(Base.Threads, :maxthreadid)
# interactive threadpool
@test fetch(@wkspawn(:interactive, 1 + 1)) == 2
end

# show incorrect behavior
ref = Ref(10)
ansref = Ref(0)
wkref = WeakRef(ref)
t = let ref=ref
Threads.@spawn begin
ansref[] = $ref[]
end
end
wait(t)
@test ansref[] == 10
t = nothing; ref = nothing; GC.gc(true); GC.gc(true); GC.gc(true)
# there should be no program references to ref, and 3 GC calls
# should have collected it, but it's still alive
@test wkref.value.x == 10

# and now with @wkspawn
ref = Ref(10)
ansref = Ref(0)
wkref = WeakRef(ref)
t = let ref=ref
@wkspawn begin
ansref[] = $ref[]
end
end
wait(t)
@test ansref[] == 10
t = nothing; ref = nothing; GC.gc(true); GC.gc(true); GC.gc(true)
@show wkref
# correctly GCed
@test wkref.value === nothing
# end

# end # @testset "WorkerUtilities"