Skip to content

Commit

Permalink
reuse port if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 16, 2015
1 parent ce2d89e commit 40c622b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
26 changes: 24 additions & 2 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,35 @@ function connect_w2w(pid::Int, config::WorkerConfig)
(s,s)
end

const client_port = Ref{Cushort}(0)

function socket_reuse_port()
s = TCPSocket()
try
client_host = Ref{Cuint}(0)
ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))

ccall(:jl_tcp_reuseport, Int32, (Ptr{Void}, ), s.handle) < 0 && throw(SystemError("setsockopt() SO_REUSEPORT : "))
ccall(:jl_tcp_getsockname_v4, Int32,
(Ptr{Void}, Ref{Cuint}, Ref{Cushort}),
s.handle, client_host, client_port) < 0 && throw(SystemError("getsockname() : "))
catch e
warn_once("Unable to reuse port : ", e)
# provide a clean new socket
return TCPSocket()
end
return s
end

function connect_to_worker(host::AbstractString, port::Integer)
# Connect to the loopback port if requested host has the same ipaddress as self.
s = socket_reuse_port()
if host == string(LPROC.bind_addr)
s = connect("127.0.0.1", UInt16(port))
s = connect(s, "127.0.0.1", UInt16(port))
else
s = connect(host, UInt16(port))
s = connect(s, host, UInt16(port))
end

# Avoid calling getaddrinfo if possible - involves a DNS lookup
Expand Down
37 changes: 20 additions & 17 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -894,29 +894,32 @@ function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream, rr_n
end # end of while
catch e
iderr = worker_id_from_socket(r_stream)
werr = worker_from_id(iderr)
oldstate = werr.state
set_worker_state(werr, W_TERMINATED)


# If error occured talking to pid 1, commit harakiri
if iderr == 1
if isopen(w_stream)
print(STDERR, "fatal error on ", myid(), ": ")
display_error(e, catch_backtrace())
if (iderr < 1)
print(STDERR, "Socket from unknown remote worker in worker ", myid())
else
werr = worker_from_id(iderr)
oldstate = werr.state
set_worker_state(werr, W_TERMINATED)

# If error occured talking to pid 1, commit harakiri
if iderr == 1
if isopen(w_stream)
print(STDERR, "fatal error on ", myid(), ": ")
display_error(e, catch_backtrace())
end
exit(1)
end
exit(1)
end

# Will treat any exception as death of node and cleanup
# since currently we do not have a mechanism for workers to reconnect
# to each other on unhandled errors
deregister_worker(iderr)
# Will treat any exception as death of node and cleanup
# since currently we do not have a mechanism for workers to reconnect
# to each other on unhandled errors
deregister_worker(iderr)
end

if isopen(r_stream) close(r_stream) end
if isopen(w_stream) close(w_stream) end

if (myid() == 1)
if (myid() == 1) && (iderr > 1)
if oldstate != W_TERMINATING
println(STDERR, "Worker $iderr terminated.")
rethrow(e)
Expand Down
29 changes: 29 additions & 0 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,36 @@ DLLEXPORT int jl_tcp_quickack(uv_tcp_t *handle, int on)
}
return 0;
}

#endif


DLLEXPORT int jl_tcp_reuseport(uv_tcp_t *handle)
{
#if defined(SO_REUSEPORT)
int fd = (handle)->io_watcher.fd;
int yes = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
return -1;
}
return 0;
#else
return -1;
#endif
}

DLLEXPORT int jl_tcp_getsockname_v4(uv_tcp_t *handle, uint32_t * ip, uint16_t * port)
{
struct sockaddr_in name;
int len = sizeof(name);
if (uv_tcp_getsockname(handle, (struct sockaddr *)&name, &len)) {
return -1;
}

*ip = ntohl(name.sin_addr.s_addr);
*port = ntohs(name.sin_port);
return 0;
}

#ifndef _OS_WINDOWS_

Expand Down

0 comments on commit 40c622b

Please sign in to comment.