Skip to content

rename Parallel to Distributed #20486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

module Parallel
module Distributed

# imports for extension
import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
Expand Down
2 changes: 1 addition & 1 deletion base/parallel/cluster.jl → base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ function disable_nagle(sock)
@static if is_linux()
# tcp_quickack is a linux only option
if ccall(:jl_tcp_quickack, Cint, (Ptr{Void}, Cint), sock.handle, 1) < 0
warn_once("Parallel networking unoptimized ( Error enabling TCP_QUICKACK : ", Libc.strerror(Libc.errno()), " )")
warn_once("Networking unoptimized ( Error enabling TCP_QUICKACK : ", Libc.strerror(Libc.errno()), " )")
end
end
end
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
68 changes: 34 additions & 34 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ precompile(Base.LineEdit.write_prompt, (Base.Terminals.TTYTerminal, Base.LineEdi
precompile(Base.LineEdit.write_prompt, (Base.Terminals.TerminalBuffer, Base.LineEdit.PromptState))
precompile(Base.Multimedia.TextDisplay, (Base.TTY,))
precompile(Base.Multimedia.display, (Int,))
precompile(Base.Parallel.ProcessGroup, (Int, Array{Any,1}, Array{Any,1}))
precompile(Base.Distributed.ProcessGroup, (Int, Array{Any,1}, Array{Any,1}))
precompile(Base.REPL.:(==), (Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}, Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal, Bool, String, String, String, String, String, Bool, Bool, Bool, Bool))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal,))
Expand Down Expand Up @@ -140,8 +140,8 @@ precompile(Base.REPLCompletions.completions, (String, Int))
precompile(Base.Random.srand, ())
precompile(Base.Random.srand, (String, Int))
precompile(Base.Random.srand, (UInt,))
precompile(Base.Parallel.RemoteChannel, (Int, Int, Int))
precompile(Base.Parallel.RemoteValue, ())
precompile(Base.Distributed.RemoteChannel, (Int, Int, Int))
precompile(Base.Distributed.RemoteValue, ())
precompile(Base.Set, ())
precompile(Base.SystemError, (String,))
precompile(Base.TCPSocket, (Ptr{Void},))
Expand Down Expand Up @@ -207,8 +207,8 @@ precompile(Base.fdio, (Int32,))
precompile(Base.fill!, (BitArray{1}, Bool))
precompile(Base.first, (UnitRange{Int},))
precompile(Base.flush, (IOStream,))
precompile(Base.Parallel.flush_gc_msgs, ())
precompile(Base.Parallel.flush_gc_msgs, (Base.Parallel.Worker,))
precompile(Base.Distributed.flush_gc_msgs, ())
precompile(Base.Distributed.flush_gc_msgs, (Base.Distributed.Worker,))
precompile(Base.get, (Base.EnvHash, String, String))
precompile(Base.get, (Dict{Any, Any}, Tuple{Int, Int}, Bool))
precompile(Base.get, (Dict{Any,Any}, Symbol, String))
Expand All @@ -223,8 +223,8 @@ precompile(Base.getindex, (Type{String}, String, String))
precompile(Base.getindex, (Type{Dict{Any, Any}}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}))
precompile(Base.getpid, ())
precompile(Base.hash, (Int,))
precompile(Base.hash, (Base.Parallel.RemoteChannel, UInt))
precompile(Base.hash, (Base.Parallel.RemoteChannel,))
precompile(Base.hash, (Base.Distributed.RemoteChannel, UInt))
precompile(Base.hash, (Base.Distributed.RemoteChannel,))
precompile(Base.haskey, (Base.EnvHash, String))
precompile(Base.haskey, (Dict{Symbol,Any}, Symbol))
precompile(Base.haskey, (ObjectIdDict, Symbol))
Expand Down Expand Up @@ -254,8 +254,8 @@ precompile(Base.isequal, (Base.LineEdit.Prompt, Base.LineEdit.Prompt))
precompile(Base.isequal, (Bool, Bool))
precompile(Base.isequal, (Char, String))
precompile(Base.isequal, (Int,Int))
precompile(Base.isequal, (Base.Parallel.RemoteChannel, Base.Parallel.RemoteChannel))
precompile(Base.isequal, (Base.Parallel.RemoteChannel, WeakRef))
precompile(Base.isequal, (Base.Distributed.RemoteChannel, Base.Distributed.RemoteChannel))
precompile(Base.isequal, (Base.Distributed.RemoteChannel, WeakRef))
precompile(Base.isequal, (Symbol, Symbol))
precompile(Base.isequal, (VersionNumber, VersionNumber))
precompile(Base.isequal, (Void, Void))
Expand Down Expand Up @@ -333,7 +333,7 @@ precompile(Base.reinit_stdio, ())
precompile(Base.repeat, (String, Int))
precompile(Base.repl_cmd, (Cmd, Base.Terminals.TTYTerminal))
precompile(Base.require, (Symbol,))
precompile(Base.Parallel.remoteref_id, (Base.Parallel.RemoteChannel,))
precompile(Base.Distributed.remoteref_id, (Base.Distributed.RemoteChannel,))
precompile(Base.rsearch, (String, Char))
precompile(Base.rstrip, (String,))
precompile(Base.run, (Cmd,))
Expand Down Expand Up @@ -388,12 +388,12 @@ precompile(Base.Symbol, (SubString{String},))
precompile(Base.sync_begin, ())
precompile(Base.sync_end, ())
precompile(Base.systemerror, (Symbol, Bool))
precompile(Base.Parallel.take!, (Base.Parallel.RemoteValue,))
precompile(Base.Parallel.take!, (Base.Parallel.RemoteChannel,))
precompile(Base.Parallel.take_ref, (Tuple{Int,Int},))
precompile(Base.Distributed.take!, (Base.Distributed.RemoteValue,))
precompile(Base.Distributed.take!, (Base.Distributed.RemoteChannel,))
precompile(Base.Distributed.take_ref, (Tuple{Int,Int},))
precompile(Base.take!, (IOBuffer,))
precompile(Base.task_local_storage, ())
precompile(Base.Parallel.terminate_all_workers, ())
precompile(Base.Distributed.terminate_all_workers, ())
precompile(Base.try_include, (String,))
precompile(Base.UInt, (UInt,))
precompile(Base.unsafe_copy!, (Array{Dict{Any, Any}, 1}, Int, Array{Dict{Any, Any}, 1}, Int, Int))
Expand All @@ -403,7 +403,7 @@ precompile(Base.uv_error, (String, Bool))
precompile(Base.uvfinalize, (Base.TTY,))
precompile(Base.vcat, (Base.LineEdit.Prompt,))
precompile(Base.wait, ())
precompile(Base.Parallel.wait, (Base.Parallel.RemoteChannel,))
precompile(Base.Distributed.wait, (Base.Distributed.RemoteChannel,))
precompile(Base.write, (Base.Terminals.TTYTerminal, String))
precompile(Base.write, (Base.Terminals.TerminalBuffer, String))
precompile(Base.write, (IOBuffer, Vector{UInt8}))
Expand Down Expand Up @@ -445,9 +445,9 @@ precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool))
precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState))
precompile(Base.schedule, (Array{Any, 1}, Task, Void))
precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))
precompile(==, (Base.Parallel.RemoteChannel, WeakRef))
precompile(==, (Base.Parallel.RemoteChannel, Base.Parallel.RemoteChannel))
precompile(Base.Parallel.send_del_client, (Base.Parallel.RemoteChannel,))
precompile(==, (Base.Distributed.RemoteChannel, WeakRef))
precompile(==, (Base.Distributed.RemoteChannel, Base.Distributed.RemoteChannel))
precompile(Base.Distributed.send_del_client, (Base.Distributed.RemoteChannel,))
precompile(!=, (Base.SubString{String}, String))
precompile(Base.join, (Base.IOBuffer, Array{Base.SubString{String}, 1}, String))
precompile(Base.joinpath, (String, String, String, String))
Expand All @@ -459,22 +459,22 @@ precompile(Base.vect, (Base.LineEdit.Prompt, String))
isdefined(Core, :Inference) && Base.code_typed(Base.code_typed)

# Speeding up addprocs for LocalManager
precompile(Base.Parallel.start_worker, ())
precompile(Base.Parallel.start_worker, (Base.TTY,))
precompile(Base.Parallel.process_messages, (Base.TCPSocket, Base.TCPSocket))
precompile(Base.Parallel.process_messages, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Parallel.process_tcp_streams, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Parallel.message_handler_loop, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Parallel.connect_to_peer, (Base.Parallel.LocalManager, Int64, Base.Parallel.WorkerConfig))
precompile(Base.Parallel.connect, (Base.Parallel.LocalManager, Int64, Base.Parallel.WorkerConfig))
precompile(Base.Parallel.connect_w2w, (Int64, Base.Parallel.WorkerConfig))
precompile(Base.Parallel.connect_to_worker, (String, Int64))
precompile(Base.Parallel.addprocs, (Base.Parallel.LocalManager, ))
precompile(Base.Parallel.addprocs, (Int, ))
precompile(Base.Parallel.setup_launched_worker, (Base.Parallel.LocalManager, Dict, Base.Parallel.WorkerConfig, Array{Int,1}))
precompile(Base.Parallel.create_worker, (Base.Parallel.LocalManager, Dict, Base.Parallel.WorkerConfig))
precompile(Base.Parallel.launch, (Base.Parallel.LocalManager, Dict, Array{Base.Parallel.WorkerConfig, 1}, Base.Condition))
precompile(Base.Parallel.set_valid_processes, (Array{Int, 1}, ))
precompile(Base.Distributed.start_worker, ())
precompile(Base.Distributed.start_worker, (Base.TTY,))
precompile(Base.Distributed.process_messages, (Base.TCPSocket, Base.TCPSocket))
precompile(Base.Distributed.process_messages, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Distributed.process_tcp_streams, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Distributed.message_handler_loop, (Base.TCPSocket, Base.TCPSocket, Void))
precompile(Base.Distributed.connect_to_peer, (Base.Distributed.LocalManager, Int64, Base.Distributed.WorkerConfig))
precompile(Base.Distributed.connect, (Base.Distributed.LocalManager, Int64, Base.Distributed.WorkerConfig))
precompile(Base.Distributed.connect_w2w, (Int64, Base.Distributed.WorkerConfig))
precompile(Base.Distributed.connect_to_worker, (String, Int64))
precompile(Base.Distributed.addprocs, (Base.Distributed.LocalManager, ))
precompile(Base.Distributed.addprocs, (Int, ))
precompile(Base.Distributed.setup_launched_worker, (Base.Distributed.LocalManager, Dict, Base.Distributed.WorkerConfig, Array{Int,1}))
precompile(Base.Distributed.create_worker, (Base.Distributed.LocalManager, Dict, Base.Distributed.WorkerConfig))
precompile(Base.Distributed.launch, (Base.Distributed.LocalManager, Dict, Array{Base.Distributed.WorkerConfig, 1}, Base.Condition))
precompile(Base.Distributed.set_valid_processes, (Array{Int, 1}, ))

# Speed up repl help
sprint(Markdown.term, @doc mean)
Expand Down
6 changes: 3 additions & 3 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ importall .SparseArrays

include("asyncmap.jl")

include("parallel/Parallel.jl")
importall .Parallel
include("distributed/Distributed.jl")
importall .Distributed
include("sharedarray.jl")

# code loading
Expand Down Expand Up @@ -385,7 +385,7 @@ function __init__()
Multimedia.reinit_displays() # since Multimedia.displays uses STDOUT as fallback
early_init()
init_load_path()
Parallel.init_parallel()
Distributed.init_parallel()
init_threadcall()
end

Expand Down
8 changes: 4 additions & 4 deletions doc/src/stdlib/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ Base.CachingPool
Base.default_worker_pool
Base.clear!(::CachingPool)
Base.remote
Base.remotecall(::Any, ::Base.Parallel.AbstractWorkerPool, ::Any...)
Base.remotecall_wait(::Any, ::Base.Parallel.AbstractWorkerPool, ::Any...)
Base.remotecall_fetch(::Any, ::Base.Parallel.AbstractWorkerPool, ::Any...)
Base.remote_do(::Any, ::Base.Parallel.AbstractWorkerPool, ::Any...)
Base.remotecall(::Any, ::Base.Distributed.AbstractWorkerPool, ::Any...)
Base.remotecall_wait(::Any, ::Base.Distributed.AbstractWorkerPool, ::Any...)
Base.remotecall_fetch(::Any, ::Base.Distributed.AbstractWorkerPool, ::Any...)
Base.remote_do(::Any, ::Base.Distributed.AbstractWorkerPool, ::Any...)
Base.timedwait
Base.@spawn
Base.@spawnat
Expand Down
4 changes: 2 additions & 2 deletions test/choosetests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function choosetests(choices = [])
"nullable", "meta", "stacktraces", "profile", "libgit2", "docs",
"markdown", "base64", "serialize", "misc", "threads",
"enums", "cmdlineargs", "i18n", "workspace", "libdl", "int",
"checked", "intset", "floatfuncs", "compile", "parallel", "inline",
"checked", "intset", "floatfuncs", "compile", "distributed", "inline",
"boundscheck", "error", "ambiguous", "cartesian", "asmvariant", "osutils",
"channels"
]
Expand Down Expand Up @@ -143,7 +143,7 @@ function choosetests(choices = [])
prepend!(tests, linalgtests)
end

net_required_for = ["socket", "parallel", "libgit2"]
net_required_for = ["socket", "distributed", "libgit2"]
net_on = true
try
ipa = getipaddr()
Expand Down
6 changes: 3 additions & 3 deletions test/parallel.jl → test/distributed.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

# Run the parallel test outside of the main driver since it needs its own
# Run the distributed test outside of the main driver since it needs its own
# set of dedicated workers.

inline_flag = Base.JLOptions().can_inline == 1 ? `` : `--inline=no`
Expand All @@ -11,8 +11,8 @@ elseif Base.JLOptions().code_coverage == 2
cov_flag = `--code-coverage=all`
end

cmd = `$(Base.julia_cmd()) $inline_flag $cov_flag --check-bounds=yes --startup-file=no --depwarn=error parallel_exec.jl`
cmd = `$(Base.julia_cmd()) $inline_flag $cov_flag --check-bounds=yes --startup-file=no --depwarn=error distributed_exec.jl`

if !success(pipeline(cmd; stdout=STDOUT, stderr=STDERR)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
error("Parallel test failed, cmd : $cmd")
error("Distributed test failed, cmd : $cmd")
end
52 changes: 26 additions & 26 deletions test/parallel_exec.jl → test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ function test_futures_dgc(id)
fid = Base.remoteref_id(f)

# remote value should be deleted after a fetch
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, fid) == true
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == true
@test isnull(f.v) == true
@test fetch(f) == id
@test isnull(f.v) == false
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, fid) == false
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == false


# if unfetched, it should be deleted after a finalize
f = remotecall(myid, id)
fid = Base.remoteref_id(f)
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, fid) == true
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == true
@test isnull(f.v) == true
finalize(f)
Base.Parallel.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, fid) == false
Base.Distributed.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, fid) == false
end

test_futures_dgc(id_me)
Expand All @@ -136,22 +136,22 @@ fstore = RemoteChannel(wid2)
put!(fstore, f)

@test fetch(f) == wid1
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == true
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true
remotecall_fetch(r->fetch(fetch(r)), wid2, fstore)
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == false
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false

# put! should release remote reference since it would have been cached locally
f = Future(wid1)
fid = Base.remoteref_id(f)

# should not be created remotely till accessed
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == false
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false
# create it remotely
isready(f)

@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == true
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true
put!(f, :OK)
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == false
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == false
@test fetch(f) == :OK

# RemoteException should be thrown on a put! when another process has set the value
Expand All @@ -162,7 +162,7 @@ fstore = RemoteChannel(wid2)
put!(fstore, f) # send f to wid2
put!(f, :OK) # set value from master

@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, fid) == true
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, fid) == true

testval = remotecall_fetch(wid2, fstore) do x
try
Expand All @@ -185,12 +185,12 @@ function test_remoteref_dgc(id)
rrid = Base.remoteref_id(rr)

# remote value should be deleted after finalizing the ref
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, rrid) == true
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == true
@test fetch(rr) == :OK
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, rrid) == true
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == true
finalize(rr)
Base.Parallel.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.Parallel.PGRP.refs, k)), id, rrid) == false
Base.Distributed.flush_gc_msgs()
@test remotecall_fetch(k->(yield();haskey(Base.Distributed.PGRP.refs, k)), id, rrid) == false
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)
Expand All @@ -204,12 +204,12 @@ rrid = Base.remoteref_id(rr)
fstore = RemoteChannel(wid2)
put!(fstore, rr)

@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, rrid) == true
finalize(rr); Base.Parallel.flush_gc_msgs() # finalize locally
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, rrid) == true
remotecall_fetch(r->(finalize(take!(r)); Base.Parallel.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == true
finalize(rr); Base.Distributed.flush_gc_msgs() # finalize locally
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == true
remotecall_fetch(r->(finalize(take!(r)); Base.Distributed.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely
sleep(0.5) # to ensure that wid2 messages have been executed on wid1
@test remotecall_fetch(k->haskey(Base.Parallel.PGRP.refs, k), wid1, rrid) == false
@test remotecall_fetch(k->haskey(Base.Distributed.PGRP.refs, k), wid1, rrid) == false

@test fetch(@spawnat id_other myid()) == id_other
@test (@fetchfrom id_other myid()) == id_other
Expand Down Expand Up @@ -669,7 +669,7 @@ let ex
end

# pmap tests. Needs at least 4 processors dedicated to the below tests. Which we currently have
# since the parallel tests are now spawned as a separate set.
# since the distributed tests are now spawned as a separate set.

# Test all combinations of pmap keyword args.
pmap_args = [
Expand Down Expand Up @@ -770,7 +770,7 @@ end
n = 10
as = [rand(4,4) for i in 1:n]
bs = deepcopy(as)
cs = collect(Base.Parallel.pgenerate(x->(sleep(rand()*0.1); svdfact(x)), bs))
cs = collect(Base.Distributed.pgenerate(x->(sleep(rand()*0.1); svdfact(x)), bs))
svdas = map(svdfact, as)
for i in 1:n
@test cs[i][:U] ≈ svdas[i][:U]
Expand Down Expand Up @@ -921,7 +921,7 @@ if DoFullTest
all_w = workers()
# Test sending fake data to workers. The worker processes will print an
# error message but should not terminate.
for w in Base.Parallel.PGRP.workers
for w in Base.Distributed.PGRP.workers
if isa(w, Base.Worker)
s = connect(get(w.config.host), get(w.config.port))
write(s, randstring(32))
Expand Down Expand Up @@ -1195,7 +1195,7 @@ function get_remote_num_threads(processes_added)
end

function test_blas_config(pid, expected)
for worker in Base.Parallel.PGRP.workers
for worker in Base.Distributed.PGRP.workers
if worker.id == pid
@test get(worker.config.enable_threaded_blas) == expected
return
Expand Down Expand Up @@ -1380,7 +1380,7 @@ wrapped_var_ser_tests()
global ids_cleanup = ones(6)
global ids_func = ()->ids_cleanup

clust_ser = (Base.Parallel.worker_from_id(id_other)).w_serializer
clust_ser = (Base.Distributed.worker_from_id(id_other)).w_serializer
@test remotecall_fetch(ids_func, id_other) == ids_cleanup

@test haskey(clust_ser.glbs_sent, object_id(ids_cleanup))
Expand All @@ -1389,7 +1389,7 @@ finalize(ids_cleanup)

# TODO Add test for cleanup from `clust_ser.glbs_in_tnobj`

# reported github issues - Mostly tests with globals and various parallel macros
# reported github issues - Mostly tests with globals and various distributed macros
#2669, #5390
v2669=10
@test fetch(@spawn (1+v2669)) == 11
Expand Down
4 changes: 2 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ function move_to_node1(t)
end
# Base.compile only works from node 1, so compile test is handled specially
move_to_node1("compile")
# In a constrained memory environment, run the parallel test after all other tests
# In a constrained memory environment, run the "distributed" test after all other tests
# since it starts a lot of workers and can easily exceed the maximum memory
max_worker_rss != typemax(Csize_t) && move_to_node1("parallel")
max_worker_rss != typemax(Csize_t) && move_to_node1("distributed")

cd(dirname(@__FILE__)) do
n = 1
Expand Down