Skip to content

[release-1.12] Revert "Scheduler: Use a "scheduler" task for thread sleep (#57544)" #58764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release-1.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 13 additions & 37 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1145,16 +1145,6 @@ function throwto(t::Task, @nospecialize exc)
return try_yieldto(identity)
end

@inline function wait_forever()
while true
wait()
end
end

const get_sched_task = OncePerThread{Task}() do
Task(wait_forever)
end

function ensure_rescheduled(othertask::Task)
ct = current_task()
W = workqueue_for(Threads.threadid())
Expand Down Expand Up @@ -1191,39 +1181,25 @@ end

checktaskempty = Partr.multiq_check_empty

@noinline function poptask(W::StickyWorkqueue)
task = trypoptask(W)
if !(task isa Task)
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
end
set_next_task(task)
nothing
end

function wait()
ct = current_task()
# [task] user_time -yield-or-done-> wait_time
record_running_time!(ct)
# let GC run
GC.safepoint()
# check for libuv events
process_events()

# get the next task to run
result = nothing
have_result = false
W = workqueue_for(Threads.threadid())
task = trypoptask(W)
if !(task isa Task)
# No tasks to run; switch to the scheduler task to run the
# thread sleep logic.
sched_task = get_sched_task()
if ct !== sched_task
result = yieldto(sched_task)
have_result = true
else
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any),
trypoptask, W, checktaskempty)
end
end
# We may have already switched tasks (via the scheduler task), so
# only switch if we haven't.
if !have_result
@assert task isa Task
set_next_task(task)
result = try_yieldto(ensure_rescheduled)
end
poptask(W)
result = try_yieldto(ensure_rescheduled)
process_events()
# return when we come out of the queue
return result
end

Expand Down
4 changes: 3 additions & 1 deletion stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,14 @@ end
fetch(r)
end

let addr = Sockets.InetAddr(ip"192.0.2.5", 4444)
let addr = Sockets.InetAddr(ip"127.0.0.1", 4444)
srv = listen(addr)
s = Sockets.TCPSocket()
Sockets.connect!(s, addr)
r = @async close(s)
@test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s)
fetch(r)
close(srv)
end
end

Expand Down
5 changes: 3 additions & 2 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,15 @@ end
cb = first(async.cond.waitq)
@test isopen(async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test isempty(Base.Workqueue)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows?)
@test length(Base.Workqueue) == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test tc[] == 0
yield() # consume event
@test tc[] == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
Base.process_events()
Sys.iswindows() && Base.process_events() # schedule event (windows?)
yield() # consume event
@test tc[] == 2
Expand Down