Skip to content

Commit

Permalink
Deserializing a SharedArray resulted in a copy of the container.
Browse files Browse the repository at this point in the history
Check and return existing SharedArray objects when deserializing
on the node which created the array.
  • Loading branch information
amitmurthy committed Jun 1, 2017
1 parent fc6cf1a commit 051c0dd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
35 changes: 33 additions & 2 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

import .Serializer: serialize_cycle_header, serialize_type, writetag, UNDEFREF_TAG
import .Distributed: RRID

mutable struct SharedArray{T,N} <: DenseArray{T,N}
id::RRID
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Vector
Expand All @@ -24,10 +26,14 @@ mutable struct SharedArray{T,N} <: DenseArray{T,N}
loc_subarr_1d::SubArray{T,1,Array{T,1},Tuple{UnitRange{Int}},true}

function SharedArray{T,N}(d,p,r,sn,s) where {T,N}
new(d,p,r,sn,s,0,view(Array{T}(ntuple(d->0,N)), 1:0))
S = new(RRID(),d,p,r,sn,s,0,view(Array{T}(ntuple(d->0,N)), 1:0))
sa_refs[S.id] = WeakRef(S)
S
end
end

const sa_refs = Dict{RRID, WeakRef}()

"""
SharedArray{T}(dims::NTuple; init=false, pids=Int[])
SharedArray{T,N}(...)
Expand All @@ -45,6 +51,9 @@ computation with the master process acting as a driver.
If an `init` function of the type `initfn(S::SharedArray)` is specified, it is called on all
the participating workers.
The shared array is valid as long as a reference to the `SharedArray` object exists on the node
which created the mapping.
SharedArray{T}(filename::AbstractString, dims::NTuple, [offset=0]; mode=nothing, init=false, pids=Int[])
SharedArray{T,N}(...)
Expand Down Expand Up @@ -246,10 +255,12 @@ function finalize_refs(S::SharedArray{T,N}) where T where N
empty!(S.refs)
init_loc_flds(S)
S.s = Array{T}(ntuple(d->0,N))
delete!(sa_refs, S.id)
end
S
end


const SharedVector{T} = SharedArray{T,1}
const SharedMatrix{T} = SharedArray{T,2}

Expand Down Expand Up @@ -402,6 +413,17 @@ end
# pidx, which is relevant to the current process only
function serialize(s::AbstractSerializer, S::SharedArray)
serialize_cycle_header(s, S) && return

destpid = worker_id_from_socket(s.io)
if S.id.whence == destpid
# The shared array was created from destpid, hence a reference to it
# must be available at destpid.
serialize(s, true)
serialize(s, S.id.whence)
serialize(s, S.id.id)
return
end
serialize(s, false)
for n in fieldnames(SharedArray)
if n in [:s, :pidx, :loc_subarr_1d]
writetag(s.io, UNDEFREF_TAG)
Expand All @@ -421,9 +443,18 @@ function serialize(s::AbstractSerializer, S::SharedArray)
end

function deserialize(s::AbstractSerializer, t::Type{<:SharedArray})
ref_exists = deserialize(s)
if ref_exists
sref = sa_refs[RRID(deserialize(s), deserialize(s))]
if sref.value !== nothing
return sref.value
end
error("Expected reference to shared array instance not found")
end

S = invoke(deserialize, Tuple{AbstractSerializer,DataType}, s, t)
init_loc_flds(S, true)
S
return S
end

function show(io::IO, S::SharedArray)
Expand Down
12 changes: 12 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,18 @@ d = SharedArray{Int}(10)
finalize(d)
@test_throws BoundsError d[1]

# Issue 22139
aorig = a1 = SharedArray{Float64}((3, 3))
a1 = remotecall_fetch(fill!, id_other, a1, 1.0)
@test object_id(aorig) == object_id(a1)
id = a1.id
aorig = nothing
a1 = remotecall_fetch(fill!, id_other, a1, 1.0)
gc(); gc()
a1 = remotecall_fetch(fill!, id_other, a1, 1.0)
@test haskey(Base.sa_refs, id)
finalize(a1)
@test !haskey(Base.sa_refs, id)

# Test @parallel load balancing - all processors should get either M or M+1
# iterations out of the loop range for some M.
Expand Down

0 comments on commit 051c0dd

Please sign in to comment.