Skip to content

Commit 044f2ca

Browse files
committed
Implement asymmetric barriers
1 parent 7b52667 commit 044f2ca

File tree

4 files changed

+99
-30
lines changed

4 files changed

+99
-30
lines changed

base/asyncevent.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,9 @@ function _trywait(t::Union{Timer, AsyncCondition})
199199
return set
200200
end
201201

202-
cancel_wait!(t::Union{Timer, AsyncCondition}, task::Task) =
203-
cancel_wait!(t.cond, task, false; waitee=t)
202+
cancel_wait!(t::Union{Timer, AsyncCondition}, @nospecialize(creq)) = false
203+
cancel_wait!(t::Union{Timer, AsyncCondition}, task::Task, @nospecialize(creq)) =
204+
cancel_wait!(t.cond, task, creq, false; waitee=t)
204205

205206
function wait(t::Union{Timer, AsyncCondition})
206207
ok = _trywait(t)

base/condition.jl

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,35 +142,62 @@ Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notif
142142
If the keyword `first` is set to `true`, the waiter will be put _first_
143143
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
144144
"""
145-
function wait(c::GenericCondition; first::Bool=false, waitee=c)
145+
function wait(c::GenericCondition; first::Bool=false, waitee=c, expected_cancellation=nothing)
146146
ct = current_task()
147147
_wait2(c, ct, waitee, first)
148+
149+
@label before_barrier
150+
# Synchronize with atomic_fence_heavy in cancel!
151+
Threads.atomic_fence_light()
152+
# We need to check if we were cancelled and should not suspend if we were.
153+
# The fencing above ensures that we either see the cancellation request
154+
# or the cancelling task will call cancel_wait! to wake us again.
155+
cr = cancellation_request()
156+
if cr !== expected_cancellation
157+
if cr === CANCEL_REQUEST_YIELD
158+
# We are about to yield anyway, so we can acknowledge the cancellation now.
159+
# However, for the integrity of the cancellation_request syncrhonization,
160+
# we must revisit the barrier above and re-check the cancellation request
161+
@atomicreplace :acquire_release :monotonic ct.cancellation_request cr => nothing
162+
@goto before_barrier
163+
else
164+
Base.list_deletefirst!(waitqueue(c), ct)
165+
return invokelatest(cancel_wait!, waitee, cr)
166+
end
167+
end
148168
token = unlockall(c.lock)
149-
try
150-
return wait()
169+
170+
ret = try
171+
wait()
151172
catch
152-
q = ct.queue; q === c && Base.list_deletefirst!(waitqueue(c), ct)
153-
rethrow()
154-
finally
155173
relockall(c.lock, token)
174+
# This cleans up our entry in the waitqueue if we were resumes from an
175+
# unexpected `throwto`. Modern code should generally avoid this pattern.
176+
q = ct.queue; q === waitee && Base.list_deletefirst!(waitqueue(c), ct)
177+
rethrow()
156178
end
179+
180+
relockall(c.lock, token)
181+
return ret
182+
end
183+
184+
function cancel_wait!(c::GenericCondition, creq; waitee = c)
185+
throw(creq)
157186
end
158187

159-
function cancel_wait!(c::GenericCondition, t::Task; waitee = c)
160-
@assert (@atomic :monotonic t.cancellation_request) !== nothing
188+
function cancel_wait!(c::GenericCondition, t::Task, @nospecialize(creq); waitee = c)
161189
lock(c)
162190
if t.queue !== waitee
163191
unlock(c)
164192
return false
165193
end
166194
Base.list_deletefirst!(ILLRef(waitqueue(c), waitee), t)
167-
schedule(t, conform_cancellation_request(t.cancellation_request), error=true)
195+
schedule(t, conform_cancellation_request(creq), error=true)
168196
unlock(c)
169197
return true
170198
end
171199

172-
function cancel_wait!(c::GenericCondition, t::Task, @nospecialize(val); waitee=c)
173-
@assert (@atomic :monotonic t.cancellation_request) !== nothing
200+
function cancel_wait!(c::GenericCondition, t::Task, @nospecialize(creq), @nospecialize(val); waitee=c)
174201
lock(c)
175202
if t.queue !== waitee
176203
unlock(c)

base/locks-mt.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,5 @@ end
108108
function islocked(l::AbstractSpinLock)
109109
return (@atomic :monotonic l.owned) != 0
110110
end
111+
112+
Base.show(io::IO, ::AbstractSpinLock) = print(io, typeof(ans), "(", islocked(ans) ? "locked" : "unlocked", ")")

base/task.jl

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,19 @@ function _wait(t::Task; expected_cancellation = nothing)
316316
lock(donenotify)
317317
try
318318
while !istaskdone(t) && cancellation_request() === expected_cancellation
319-
wait(donenotify; waitee=t)
319+
wait(donenotify; waitee=t, expected_cancellation)
320320
end
321321
finally
322322
unlock(donenotify)
323323
end
324324
end
325325
nothing
326326
end
327-
cancel_wait!(waitee::Task, waiter::Task) = cancel_wait!(waitee.donenotify, waiter, nothing; waitee)
327+
328+
# We handle cancellation explicitly above - just suppress the error here
329+
cancel_wait!(waitee::Task, @nospecialize(creq)) = nothing
330+
cancel_wait!(waitee::Task, waiter::Task, @nospecialize(creq)) =
331+
cancel_wait!(waitee.donenotify, waiter, creq, nothing; waitee)
328332

329333
# have `waiter` wait for `t`
330334
function _wait2(t::Task, waiter::Task)
@@ -367,7 +371,7 @@ Throws a `ConcurrencyViolationError` if `t` is the currently running task, to pr
367371
"""
368372
function wait(t::Task; throw=true)
369373
_wait(t)
370-
cr = cancellation_request()
374+
cr = cancellation_request_or_yield()
371375
if cr !== nothing
372376
propagate_cancellation!(t, cr)
373377
end
@@ -611,7 +615,7 @@ function sync_end(c::Channel{Any})
611615
r = take!(c)
612616
if isa(r, Task)
613617
_wait(r)
614-
cr = cancellation_request()
618+
cr = cancellation_request_or_yield()
615619
if cr !== nothing
616620
return sync_cancel!(c, r, cr, @isdefined(c_ex) ? c_ex : CompositeException())
617621
end
@@ -1423,6 +1427,14 @@ end
14231427
throw(req)
14241428
end
14251429

1430+
function cancellation_request_raw()
1431+
ct = current_task()
1432+
req = @atomic :monotonic ct.cancellation_request
1433+
req === nothing && return req
1434+
req = @atomic :acquire ct.cancellation_request
1435+
return req
1436+
end
1437+
14261438
"""
14271439
cancellation_request()
14281440
@@ -1431,13 +1443,26 @@ cancellation has been requested. If a cancellation request is present, it is
14311443
loaded with acquire semantics.
14321444
"""
14331445
function cancellation_request()
1434-
ct = current_task()
1435-
req = @atomic :monotonic ct.cancellation_request
1436-
req === nothing && return req
1437-
cr = @atomic :acquire ct.cancellation_request
1446+
cr = cancellation_request_raw()
14381447
return conform_cancellation_request(cr)
14391448
end
14401449

1450+
"""
1451+
cancellation_request_or_yield()
1452+
1453+
Like [`cancellation_request`](@ref), but specifically handles CANCEL_REQUEST_YIELD
1454+
by calling yield internally and re-checking for cancellation requests.
1455+
"""
1456+
function cancellation_request_or_yield()
1457+
while true
1458+
_cr = cancellation_request_raw()
1459+
cr = conform_cancellation_request(_cr)
1460+
cr !== CANCEL_REQUEST_YIELD && return cr
1461+
@atomicreplace :sequentially_consistent :monotonic current_task().cancellation_request _cr => nothing
1462+
yield()
1463+
end
1464+
end
1465+
14411466
"""
14421467
Core.cancellation_point!()
14431468
@@ -1451,7 +1476,7 @@ Core.cancellation_point!
14511476
function cancel!(t::Task, crequest=CANCEL_REQUEST_SAFE)
14521477
# TODO: Raise task priority
14531478
@atomic :release t.cancellation_request = crequest
1454-
# TODO: SYS_membarrier() ?
1479+
Threads.atomic_fence_heavy()
14551480
# Special case: If the task hasn't started yet at this point, we want to set
14561481
# it up to cancel any waits, but we need to be a bit careful with concurrent
14571482
# starts of the task.
@@ -1465,15 +1490,29 @@ function cancel!(t::Task, crequest=CANCEL_REQUEST_SAFE)
14651490
end
14661491
return
14671492
end
1468-
# Try to interrupt the task if it's at a cancellation point (has reset_ctx set)
1493+
# Try to interrupt the task. The barrier above synchronizes with the establishment
1494+
# of a wait object and guarantees that either:
1495+
# 1. We have the wait object in t.queue, or
1496+
# 2. The task saw the cancellation and called (a different method of) cancel_wait!
1497+
# itself.
1498+
# Note that it is possible for both to be true, in which case the task wins
1499+
# and our call to cancel_wait! is will no-op after acquiring the waitee lock.
1500+
#
1501+
# Additionally, if there is no wait object, either
1502+
# 1. The task is suspended, but not using our wait object protocol.
1503+
# In this case, cancellation will not succeed.
1504+
# 2. The task is running.
1505+
#
1506+
# We can't tell the difference, but we unconditionally try to send the cancellation
1507+
# signal. If a reset_ctx exists, this will cause the task to be interrupted.
14691508
tid = Threads.threadid(t)
1470-
if tid != 0
1471-
ccall(:jl_send_cancellation_signal, Cvoid, (Int16,), (tid - 1) % Int16)
1472-
end
1473-
while !istaskdone(t)
1509+
if !istaskdone(t)
14741510
waitee = t.queue
1475-
waitee === nothing && (yield(); continue)
1476-
invokelatest(cancel_wait!, waitee, t) && break
1511+
if waitee !== nothing
1512+
invokelatest(cancel_wait!, waitee, t, crequest)
1513+
elseif tid != 0
1514+
ccall(:jl_send_cancellation_signal, Cvoid, (Int16,), (tid - 1) % Int16)
1515+
end
14771516
end
14781517
if t.sticky
14791518
# If this task is sticky, it won't be able to run if the task currently
@@ -1494,7 +1533,7 @@ function cancel!(t::Task, crequest=CANCEL_REQUEST_SAFE)
14941533
end
14951534
end
14961535

1497-
function cancel_wait!(q::StickyWorkqueue, t::Task)
1536+
function cancel_wait!(q::StickyWorkqueue, t::Task, @nospecialize(creq))
14981537
# Tasks in the workqueue are runnable - we do not cancel the wait,
14991538
# but we do need to check whether it's in there
15001539
lock(q.lock)

0 commit comments

Comments
 (0)