Skip to content

Commit

Permalink
Improve inference for paths leading to similar (#37163)
Browse files Browse the repository at this point in the history
* Infer better eachindex broadcasting

* Fix a misuse of show_datatype

* Improve inference in vcat(A::BitMatrix...)

Because the tuple-length is unknown and because inference gives up
easily in the face of missing type parameters, the generator expressions
in the previous implementation were poorly inferred.

* Use Vector{String}  in Cmd field type

* Introduce ntupleany and use mapany in more places

This also makes mapany safe for iterators without `length`

* Add types to some comprehensions and lists

* Add some type-asserts and argtypes

* AbstractString->String in Distributed.ProcessGroup

* Update base/Enums.jl

* Update base/abstractarray.jl

Co-authored-by: Pablo Zubieta <8410335+pabloferz@users.noreply.github.com>
  • Loading branch information
timholy and pabloferz authored Sep 1, 2020
1 parent 03b4b01 commit 6f93bbe
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
using Base: Process, Semaphore, JLOptions, buffer_writes, @sync_add,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered
shell_escape_posixly, uv_error, something, notnothing, isbuffered,
mapany
using Base.Threads: Event

using Serialization, Sockets
Expand Down
14 changes: 7 additions & 7 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ function create_worker(manager, wconfig)
end
end

all_locs = map(x -> isa(x, Worker) ?
(something(x.config.connect_at, ()), x.id) :
((), x.id, true),
join_list)
all_locs = mapany(x -> isa(x, Worker) ?
(something(x.config.connect_at, ()), x.id) :
((), x.id, true),
join_list)
send_connection_hdr(w, true)
enable_threaded_blas = something(wconfig.enable_threaded_blas, false)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
Expand Down Expand Up @@ -765,7 +765,7 @@ let next_pid = 2 # 1 is reserved for the client (always)
end

mutable struct ProcessGroup
name::AbstractString
name::String
workers::Array{Any,1}
refs::Dict{RRID,Any} # global references
topology::Symbol
Expand Down Expand Up @@ -1024,8 +1024,8 @@ end
function _rmprocs(pids, waitfor)
lock(worker_lock)
try
rmprocset = []
for p in vcat(pids...)
rmprocset = Union{LocalProcess, Worker}[]
for p in pids
if p == 1
@warn "rmprocs: process 1 not removed"
else
Expand Down
2 changes: 1 addition & 1 deletion src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
schedule(t)
push!(w_exec, t)
end
reduce(reducer, [fetch(t) for t in w_exec])
reduce(reducer, Any[fetch(t) for t in w_exec])
end

function pfor(f, R)
Expand Down
2 changes: 1 addition & 1 deletion src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
end

function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = notnothing(config.connect_at)::Tuple{AbstractString, Int}
(rhost, rport) = notnothing(config.connect_at)::Tuple{String, Int}
config.host = rhost
config.port = rport
(s, bind_addr) = connect_to_worker(rhost, rport)
Expand Down
6 changes: 3 additions & 3 deletions src/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ extract_exception(e) = isa(e, RemoteException) ? e.captured.ex : e

function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check)
# Handle all the ones in error in another pmap, with batch size set to 1
reprocess = []
reprocess = Tuple{Int,BatchProcessingError}[]
for (idx, v) in enumerate(results)
if isa(v, BatchProcessingError)
push!(reprocess, (idx,v))
Expand All @@ -210,14 +210,14 @@ function process_batch_errors!(p, f, results, on_error, retry_delays, retry_chec

if length(reprocess) > 0
errors = [x[2] for x in reprocess]
exceptions = [x.ex for x in errors]
exceptions = Any[x.ex for x in errors]
state = iterate(retry_delays)
state !== nothing && (state = state[2])
error_processed = let state=state
if (length(retry_delays)::Int > 0) &&
(retry_check === nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
# BatchProcessingError.data is a tuple of original args
pmap(x->f(x...), p, [x.data for x in errors];
pmap(x->f(x...), p, Any[x.data for x in errors];
on_error = on_error, retry_delays = collect(retry_delays)[2:end::Int], retry_check = retry_check)
elseif on_error !== nothing
map(on_error, exceptions)
Expand Down

0 comments on commit 6f93bbe

Please sign in to comment.