Skip to content

Commit

Permalink
Merge pull request #30806 from JuliaLang/jn/task-ll
Browse files Browse the repository at this point in the history
worklist changes for partr (1/3)
  • Loading branch information
JeffBezanson authored Mar 19, 2019
2 parents b93fd23 + bd610a3 commit 41c3369
Show file tree
Hide file tree
Showing 19 changed files with 675 additions and 299 deletions.
3 changes: 2 additions & 1 deletion base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ include("env.jl")

# Scheduling
include("libuv.jl")
include("linked_list.jl")
include("event.jl")
include("task.jl")
include("threads.jl")
include("lock.jl")
include("task.jl")
include("weakkeydict.jl")

# Logging
Expand Down
4 changes: 2 additions & 2 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ eval(Core, :(LineInfoNode(@nospecialize(method), file::Symbol, line::Int, inline

Module(name::Symbol=:anonymous, std_imports::Bool=true) = ccall(:jl_f_new_module, Ref{Module}, (Any, Bool), name, std_imports)

function Task(@nospecialize(f), reserved_stack::Int=0)
return ccall(:jl_new_task, Ref{Task}, (Any, Int), f, reserved_stack)
function _Task(@nospecialize(f), reserved_stack::Int, completion_future)
return ccall(:jl_new_task, Ref{Task}, (Any, Any, Int), f, completion_future, reserved_stack)
end

# simple convert for use by constructors of types in Core
Expand Down
211 changes: 8 additions & 203 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ Abstract implementation of a condition object
for synchonizing tasks objects with a given lock.
"""
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
waitq::InvasiveLinkedList{Task}
lock::L

GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
Expand Down Expand Up @@ -94,11 +94,10 @@ function wait(c::GenericCondition)
assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
list_deletefirst!(c.waitq, ct)
rethrow()
finally
relockall(c.lock, token)
Expand All @@ -118,16 +117,11 @@ notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
for t in c.waitq
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
cnt = 1
while !isempty(c.waitq)
t = popfirst!(c.waitq)
schedule(t, arg, error=error)
cnt += 1
all || break
end
return cnt
end
Expand Down Expand Up @@ -161,195 +155,6 @@ This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-saf
const Condition = GenericCondition{AlwaysLockedST}


## scheduler and work queue

global const Workqueue = Task[]

function enq_work(t::Task)
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
push!(Workqueue, t)
t.state = :queued
return t
end

schedule(t::Task) = enq_work(t)

"""
schedule(t::Task, [val]; error=false)
Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
If a second argument `val` is provided, it will be passed to the task (via the return value of
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
the woken task.
# Examples
```jldoctest
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
```
"""
function schedule(t::Task, arg; error=false)
# schedule a task to be (re)started with the given value or exception
if error
t.exception = arg
else
t.result = arg
end
return enq_work(t)
end

# fast version of `schedule(t, arg); wait()`
function schedule_and_wait(t::Task, arg=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
return yieldto(t, arg)
else
t.result = arg
push!(Workqueue, t)
t.state = :queued
end
return wait()
end

"""
yield()
Switch to the scheduler to allow another scheduled task to run. A task that calls this
function is still runnable, and will be restarted immediately if there are no other runnable
tasks.
"""
yield() = (enq_work(current_task()); wait())

"""
yield(t::Task, arg = nothing)
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
immediately yields to `t` before calling the scheduler.
"""
function yield(t::Task, @nospecialize x = nothing)
t.state == :runnable || error("schedule: Task not runnable")
t.result = x
enq_work(current_task())
return try_yieldto(ensure_rescheduled, Ref(t))
end

"""
yieldto(t::Task, arg = nothing)
Switch to the given task. The first time a task is switched to, the task's function is
called with no arguments. On subsequent switches, `arg` is returned from the task's last
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
or scheduling in any way. Its use is discouraged.
"""
function yieldto(t::Task, @nospecialize x = nothing)
t.result = x
return try_yieldto(identity, Ref(t))
end

function try_yieldto(undo, reftask::Ref{Task})
try
ccall(:jl_switchto, Cvoid, (Any,), reftask)
catch
undo(reftask[])
rethrow()
end
ct = current_task()
exc = ct.exception
if exc !== nothing
ct.exception = nothing
throw(exc)
end
result = ct.result
ct.result = nothing
return result
end

# yield to a task, throwing an exception in it
function throwto(t::Task, @nospecialize exc)
t.exception = exc
return yieldto(t)
end

function ensure_rescheduled(othertask::Task)
ct = current_task()
if ct !== othertask && othertask.state == :runnable
# we failed to yield to othertask
# return it to the head of the queue to be scheduled later
pushfirst!(Workqueue, othertask)
othertask.state = :queued
end
if ct.state == :queued
# if the current task was queued,
# also need to return it to the runnable state
# before throwing an error
i = findfirst(t->t===ct, Workqueue)
i === nothing || deleteat!(Workqueue, i)
ct.state = :runnable
end
nothing
end

@noinline function poptask()
t = popfirst!(Workqueue)
if t.state != :queued
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
# and don't want to use print() here, because that may try to incur a task switch
ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
"\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n")
return
end
t.state = :runnable
return Ref(t)
end

function wait()
while true
if isempty(Workqueue)
c = process_events(true)
if c == 0 && eventloop() != C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
reftask = poptask()
if reftask !== nothing
result = try_yieldto(ensure_rescheduled, reftask)
process_events(false)
# return when we come out of the queue
return result
end
end
end
# unreachable
end

if Sys.iswindows()
pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
else
pause() = ccall(:pause, Cvoid, ())
end


## async event notifications

"""
Expand Down
Loading

0 comments on commit 41c3369

Please sign in to comment.