Skip to content

Commit

Permalink
reorder pmap arguments to allow do-block syntax (#26783)
Browse files Browse the repository at this point in the history
  • Loading branch information
mweastwood authored and JeffBezanson committed Apr 12, 2018
1 parent 28554ad commit d358068
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
6 changes: 6 additions & 0 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,14 @@ include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")

# Deprecations

@eval @deprecate $(Symbol("@parallel")) $(Symbol("@distributed"))

# PR 26783
@deprecate pmap(p::AbstractWorkerPool, f, c; kwargs...) pmap(f, p, c; kwargs...)
@deprecate pmap(p::AbstractWorkerPool, f, c1, c...; kwargs...) pmap(f, p, c1, c...; kwargs...)

function __init__()
push!(Base.package_callbacks, _require_callback)
init_parallel()
Expand Down
10 changes: 5 additions & 5 deletions src/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pgenerate(f, c) = pgenerate(default_worker_pool(), f, c)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))

"""
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
Expand Down Expand Up @@ -96,7 +96,7 @@ delays up to 3 times. Return a `NaN` in place for all `InexactError` occurrences
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow(e)), retry_delays = ExponentialBackOff(n = 3))
```
"""
function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
function pmap(f, p::AbstractWorkerPool, c; distributed=true, batch_size=1, on_error=nothing,
retry_delays=[], retry_check=nothing)
f_orig = f
# Don't do remote calls if there are no workers.
Expand Down Expand Up @@ -152,8 +152,8 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er
end
end

pmap(p::AbstractWorkerPool, f, c1, c...; kwargs...) = pmap(p, a->f(a...), zip(c1, c...); kwargs...)
pmap(f, c; kwargs...) = pmap(default_worker_pool(), f, c; kwargs...)
pmap(f, p::AbstractWorkerPool, c1, c...; kwargs...) = pmap(a->f(a...), p, zip(c1, c...); kwargs...)
pmap(f, c; kwargs...) = pmap(f, default_worker_pool(), c; kwargs...)
pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...)

function wrap_on_error(f, on_error; capture_data=false)
Expand Down Expand Up @@ -215,7 +215,7 @@ function process_batch_errors!(p, f, results, on_error, retry_delays, retry_chec
if (length(retry_delays) > 0) &&
(retry_check==nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
# BatchProcessingError.data is a tuple of original args
error_processed = pmap(p, x->f(x...), [x.data for x in errors];
error_processed = pmap(x->f(x...), p, [x.data for x in errors];
on_error = on_error, retry_delays = collect(retry_delays)[2:end], retry_check = retry_check)
elseif on_error !== nothing
error_processed = map(on_error, exceptions)
Expand Down
10 changes: 5 additions & 5 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ workloads = Int[sum(ids .== i) for i in 2:nprocs()]
@test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5)

# specify pids for pmap
@test sort(workers()[1:2]) == sort(unique(pmap(WorkerPool(workers()[1:2]), x->(sleep(0.1);myid()), 1:10)))
@test sort(workers()[1:2]) == sort(unique(pmap(x->(sleep(0.1);myid()), WorkerPool(workers()[1:2]), 1:10)))

# Testing buffered and unbuffered reads
# This large array should write directly to the socket
Expand Down Expand Up @@ -546,7 +546,7 @@ walk_args(1)

include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "generic_map_tests.jl"))
empty_pool = WorkerPool([myid()])
pmap_fallback = (f, c...) -> pmap(empty_pool, f, c...)
pmap_fallback = (f, c...) -> pmap(f, empty_pool, c...)
generic_map_tests(pmap_fallback)

# pmap with various types. Test for equivalence with map
Expand Down Expand Up @@ -589,13 +589,13 @@ pmap(_->myid(), 1:nworkers()) # priming run

# Same tests with custom worker pools.
wp = WorkerPool(workers())
@test nworkers() == length(unique(pmap(wp, _->myid(), 1:100)))
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(wp, _->myid(), 1:100), id_other, wp)))
@test nworkers() == length(unique(pmap(_->myid(), wp, 1:100)))
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))


# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(wp, x->x, 1:100)
@test [1:100...] == pmap(x->x, wp, 1:100)

clear!(wp)
@test length(wp.map_obj2ref) == 0
Expand Down

0 comments on commit d358068

Please sign in to comment.