Skip to content

Commit b4f3d15

Browse files
krynjuvtjnashtkf
authored andcommitted
[Distributed] Worker local race condition between put! and fetch for Futures (JuliaLang/julia#42339)
* add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <vtjnash@gmail.com> Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com> (cherry picked from commit 4d17719)
1 parent e8af70c commit b4f3d15

File tree

2 files changed

+96
-28
lines changed

2 files changed

+96
-28
lines changed

src/remotecall.jl

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
2626
where::Int
2727
whence::Int
2828
id::Int
29-
v::Union{Some{Any}, Nothing}
29+
lock::ReentrantLock
30+
@atomic v::Union{Some{Any}, Nothing}
3031

3132
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
32-
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
33+
(r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r))
3334

34-
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
35+
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances
3536
end
3637

3738
"""
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
6970
found = getkey(client_refs, r, nothing)
7071
if found !== nothing
7172
@assert r.where > 0
72-
if isa(r, Future) && found.v === nothing && r.v !== nothing
73-
# we have recd the value from another source, probably a deserialized ref, send a del_client message
74-
send_del_client(r)
75-
found.v = r.v
73+
if isa(r, Future)
74+
# this is only for copying the reference from Future to RemoteRef (just created)
75+
fv_cache = @atomic :acquire found.v
76+
rv_cache = @atomic :monotonic r.v
77+
if fv_cache === nothing && rv_cache !== nothing
78+
# we have recd the value from another source, probably a deserialized ref, send a del_client message
79+
send_del_client(r)
80+
@lock found.lock begin
81+
@atomicreplace found.v nothing => rv_cache
82+
end
83+
end
7684
end
7785
return found::typeof(r)
7886
end
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
9199
send_del_client_no_lock(r)
92100
else
93101
# send_del_client only if the reference has not been set
94-
r.v === nothing && send_del_client_no_lock(r)
95-
r.v = nothing
102+
v_cache = @atomic :monotonic r.v
103+
v_cache === nothing && send_del_client_no_lock(r)
104+
@atomic :monotonic r.v = nothing
96105
end
97106
r.where = 0
98107
finally
@@ -201,7 +210,8 @@ isready(f) # will not block
201210
```
202211
"""
203212
function isready(rr::Future)
204-
rr.v === nothing || return true
213+
v_cache = @atomic rr.v
214+
v_cache === nothing || return true
205215

206216
rid = remoteref_id(rr)
207217
return if rr.where == myid()
@@ -354,26 +364,33 @@ end
354364

355365
channel_type(rr::RemoteChannel{T}) where {T} = T
356366

357-
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing)
358-
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
359-
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
360-
if addclient
367+
function serialize(s::ClusterSerializer, f::Future)
368+
v_cache = @atomic f.v
369+
if v_cache === nothing
361370
p = worker_id_from_socket(s.io)
362-
(p !== rr.where) && send_add_client(rr, p)
371+
(p !== f.where) && send_add_client(f, p)
363372
end
373+
fc = Future((f.where, f.whence, f.id, v_cache)) # copy to be used for serialization (contains a reset lock)
374+
invoke(serialize, Tuple{ClusterSerializer, Any}, s, fc)
375+
end
376+
377+
function serialize(s::ClusterSerializer, rr::RemoteChannel)
378+
p = worker_id_from_socket(s.io)
379+
(p !== rr.where) && send_add_client(rr, p)
364380
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
365381
end
366382

367383
function deserialize(s::ClusterSerializer, t::Type{<:Future})
368-
f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
369-
f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
384+
fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
385+
f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table
370386

371387
# 1) send_add_client() is not executed when the ref is being serialized
372388
# to where it exists, hence do it here.
373389
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an
374390
# already 'fetch'ed instance in client_refs (Issue #25847), we should not
375391
# track it in the backing RemoteValue store.
376-
if f2.where == myid() && f2.v === nothing
392+
f2v_cache = @atomic f2.v
393+
if f2.where == myid() && f2v_cache === nothing
377394
add_client(remoteref_id(f2), myid())
378395
end
379396
f2
@@ -570,7 +587,7 @@ end
570587
571588
Wait for a value to become available for the specified [`Future`](@ref).
572589
"""
573-
wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)
590+
wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)
574591

575592
"""
576593
wait(r::RemoteChannel, args...)
@@ -587,11 +604,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
587604
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
588605
"""
589606
function fetch(r::Future)
590-
r.v !== nothing && return something(r.v)
591-
v = call_on_owner(fetch_ref, r)
592-
r.v = Some(v)
607+
v_cache = @atomic r.v
608+
v_cache !== nothing && return something(v_cache)
609+
610+
if r.where == myid()
611+
rv, v_cache = @lock r.lock begin
612+
v_cache = @atomic :monotonic r.v
613+
rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing
614+
rv, v_cache
615+
end
616+
617+
if v_cache !== nothing
618+
return something(v_cache)
619+
else
620+
v_local = fetch(rv.c)
621+
end
622+
else
623+
v_local = call_on_owner(fetch_ref, r)
624+
end
625+
626+
v_cache = @atomic r.v
627+
628+
if v_cache === nothing # call_on_owner case
629+
v_old, status = @lock r.lock begin
630+
@atomicreplace r.v nothing => Some(v_local)
631+
end
632+
# status == true - when value obtained through call_on_owner
633+
# status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
634+
# why? local put! performs caching and putting into channel under r.lock
635+
636+
# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
637+
v_cache = status ? v_local : v_old
638+
end
639+
593640
send_del_client(r)
594-
v
641+
something(v_cache)
595642
end
596643

597644
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
@@ -615,12 +662,30 @@ A `put!` on an already set `Future` throws an `Exception`.
615662
All asynchronous remote calls return `Future`s and set the
616663
value to the return value of the call upon completion.
617664
"""
618-
function put!(rr::Future, v)
619-
rr.v !== nothing && error("Future can be set only once")
620-
call_on_owner(put_future, rr, v, myid())
621-
rr.v = Some(v)
622-
rr
665+
function put!(r::Future, v)
666+
if r.where == myid()
667+
rid = remoteref_id(r)
668+
rv = lookup_ref(rid)
669+
isready(rv) && error("Future can be set only once")
670+
@lock r.lock begin
671+
put!(rv, v) # this notifies the tasks waiting on the channel in fetch
672+
set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
673+
end
674+
del_client(rid, myid())
675+
else
676+
@lock r.lock begin # same idea as above if there were any local tasks fetching on this Future
677+
call_on_owner(put_future, r, v, myid())
678+
set_future_cache(r, v)
679+
end
680+
end
681+
r
623682
end
683+
684+
function set_future_cache(r::Future, v)
685+
_, ok = @atomicreplace r.v nothing => Some(v)
686+
ok || error("internal consistency error detected for Future")
687+
end
688+
624689
function put_future(rid, v, caller)
625690
rv = lookup_ref(rid)
626691
isready(rv) && error("Future can be set only once")

test/distributed_exec.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,9 @@ function test_regular_io_ser(ref::Distributed.AbstractRemoteRef)
350350
v = getfield(ref2, fld)
351351
if isa(v, Number)
352352
@test v === zero(typeof(v))
353+
elseif fld == :lock
354+
@test v isa ReentrantLock
355+
@test !islocked(v)
353356
elseif v !== nothing
354357
error(string("Add test for field ", fld))
355358
end

0 commit comments

Comments
 (0)