@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
26
26
where :: Int
27
27
whence:: Int
28
28
id:: Int
29
- v:: Union{Some{Any}, Nothing}
29
+ lock:: ReentrantLock
30
+ @atomic v:: Union{Some{Any}, Nothing}
30
31
31
32
Future (w:: Int , rrid:: RRID , v:: Union{Some, Nothing} = nothing ) =
32
- (r = new (w,rrid. whence,rrid. id,v); return test_existing_ref (r))
33
+ (r = new (w,rrid. whence,rrid. id,ReentrantLock (), v); return test_existing_ref (r))
33
34
34
- Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],t[4 ]) # Useful for creating dummy, zeroed-out instances
35
+ Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],ReentrantLock (), t[4 ]) # Useful for creating dummy, zeroed-out instances
35
36
end
36
37
37
38
"""
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
69
70
found = getkey (client_refs, r, nothing )
70
71
if found != = nothing
71
72
@assert r. where > 0
72
- if isa (r, Future) && found. v === nothing && r. v != = nothing
73
- # we have recd the value from another source, probably a deserialized ref, send a del_client message
74
- send_del_client (r)
75
- found. v = r. v
73
+ if isa (r, Future)
74
+ # this is only for copying the reference from Future to RemoteRef (just created)
75
+ fv_cache = @atomic :acquire found. v
76
+ rv_cache = @atomic :monotonic r. v
77
+ if fv_cache === nothing && rv_cache != = nothing
78
+ # we have recd the value from another source, probably a deserialized ref, send a del_client message
79
+ send_del_client (r)
80
+ @lock found. lock begin
81
+ @atomicreplace found. v nothing => rv_cache
82
+ end
83
+ end
76
84
end
77
85
return found:: typeof (r)
78
86
end
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
91
99
send_del_client_no_lock (r)
92
100
else
93
101
# send_del_client only if the reference has not been set
94
- r. v === nothing && send_del_client_no_lock (r)
95
- r. v = nothing
102
+ v_cache = @atomic :monotonic r. v
103
+ v_cache === nothing && send_del_client_no_lock (r)
104
+ @atomic :monotonic r. v = nothing
96
105
end
97
106
r. where = 0
98
107
finally
@@ -201,7 +210,8 @@ isready(f) # will not block
201
210
```
202
211
"""
203
212
function isready (rr:: Future )
204
- rr. v === nothing || return true
213
+ v_cache = @atomic rr. v
214
+ v_cache === nothing || return true
205
215
206
216
rid = remoteref_id (rr)
207
217
return if rr. where == myid ()
@@ -354,26 +364,33 @@ end
354
364
355
365
channel_type (rr:: RemoteChannel{T} ) where {T} = T
356
366
357
- serialize (s:: ClusterSerializer , f:: Future ) = serialize (s, f, f. v === nothing )
358
- serialize (s:: ClusterSerializer , rr:: RemoteChannel ) = serialize (s, rr, true )
359
- function serialize (s:: ClusterSerializer , rr:: AbstractRemoteRef , addclient)
360
- if addclient
367
+ function serialize (s:: ClusterSerializer , f:: Future )
368
+ v_cache = @atomic f. v
369
+ if v_cache === nothing
361
370
p = worker_id_from_socket (s. io)
362
- (p != = rr . where) && send_add_client (rr , p)
371
+ (p != = f . where) && send_add_client (f , p)
363
372
end
373
+ fc = Future ((f. where, f. whence, f. id, v_cache)) # copy to be used for serialization (contains a reset lock)
374
+ invoke (serialize, Tuple{ClusterSerializer, Any}, s, fc)
375
+ end
376
+
377
+ function serialize (s:: ClusterSerializer , rr:: RemoteChannel )
378
+ p = worker_id_from_socket (s. io)
379
+ (p != = rr. where) && send_add_client (rr, p)
364
380
invoke (serialize, Tuple{ClusterSerializer, Any}, s, rr)
365
381
end
366
382
367
383
function deserialize (s:: ClusterSerializer , t:: Type{<:Future} )
368
- f = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t)
369
- f2 = Future (f . where, RRID (f . whence, f . id), f . v) # ctor adds to client_refs table
384
+ fc = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
385
+ f2 = Future (fc . where, RRID (fc . whence, fc . id), fc . v) # ctor adds to client_refs table
370
386
371
387
# 1) send_add_client() is not executed when the ref is being serialized
372
388
# to where it exists, hence do it here.
373
389
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an
374
390
# already 'fetch'ed instance in client_refs (Issue #25847), we should not
375
391
# track it in the backing RemoteValue store.
376
- if f2. where == myid () && f2. v === nothing
392
+ f2v_cache = @atomic f2. v
393
+ if f2. where == myid () && f2v_cache === nothing
377
394
add_client (remoteref_id (f2), myid ())
378
395
end
379
396
f2
567
584
568
585
Wait for a value to become available for the specified [`Future`](@ref).
569
586
"""
570
- wait (r:: Future ) = (r. v != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
587
+ wait (r:: Future ) = (v_cache = @atomic r. v; v_cache != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
571
588
572
589
"""
573
590
wait(r::RemoteChannel, args...)
@@ -584,11 +601,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
584
601
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
585
602
"""
586
603
function fetch (r:: Future )
587
- r. v != = nothing && return something (r. v)
588
- v = call_on_owner (fetch_ref, r)
589
- r. v = Some (v)
604
+ v_cache = @atomic r. v
605
+ v_cache != = nothing && return something (v_cache)
606
+
607
+ if r. where == myid ()
608
+ rv, v_cache = @lock r. lock begin
609
+ v_cache = @atomic :monotonic r. v
610
+ rv = v_cache === nothing ? lookup_ref (remoteref_id (r)) : nothing
611
+ rv, v_cache
612
+ end
613
+
614
+ if v_cache != = nothing
615
+ return something (v_cache)
616
+ else
617
+ v_local = fetch (rv. c)
618
+ end
619
+ else
620
+ v_local = call_on_owner (fetch_ref, r)
621
+ end
622
+
623
+ v_cache = @atomic r. v
624
+
625
+ if v_cache === nothing # call_on_owner case
626
+ v_old, status = @lock r. lock begin
627
+ @atomicreplace r. v nothing => Some (v_local)
628
+ end
629
+ # status == true - when value obtained through call_on_owner
630
+ # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
631
+ # why? local put! performs caching and putting into channel under r.lock
632
+
633
+ # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
634
+ v_cache = status ? v_local : v_old
635
+ end
636
+
590
637
send_del_client (r)
591
- v
638
+ something (v_cache)
592
639
end
593
640
594
641
fetch_ref (rid, args... ) = fetch (lookup_ref (rid). c, args... )
@@ -612,12 +659,30 @@ A `put!` on an already set `Future` throws an `Exception`.
612
659
All asynchronous remote calls return `Future`s and set the
613
660
value to the return value of the call upon completion.
614
661
"""
615
- function put! (rr:: Future , v)
616
- rr. v != = nothing && error (" Future can be set only once" )
617
- call_on_owner (put_future, rr, v, myid ())
618
- rr. v = Some (v)
619
- rr
662
+ function put! (r:: Future , v)
663
+ if r. where == myid ()
664
+ rid = remoteref_id (r)
665
+ rv = lookup_ref (rid)
666
+ isready (rv) && error (" Future can be set only once" )
667
+ @lock r. lock begin
668
+ put! (rv, v) # this notifies the tasks waiting on the channel in fetch
669
+ set_future_cache (r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
670
+ end
671
+ del_client (rid, myid ())
672
+ else
673
+ @lock r. lock begin # same idea as above if there were any local tasks fetching on this Future
674
+ call_on_owner (put_future, r, v, myid ())
675
+ set_future_cache (r, v)
676
+ end
677
+ end
678
+ r
620
679
end
680
+
681
+ function set_future_cache (r:: Future , v)
682
+ _, ok = @atomicreplace r. v nothing => Some (v)
683
+ ok || error (" internal consistency error detected for Future" )
684
+ end
685
+
621
686
function put_future (rid, v, caller)
622
687
rv = lookup_ref (rid)
623
688
isready (rv) && error (" Future can be set only once" )
0 commit comments