From 40c622b7e66a88c05d8892222126f1d13851bc62 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Tue, 7 Jul 2015 14:21:39 +0530 Subject: [PATCH] reuse port if possible --- base/managers.jl | 26 ++++++++++++++++++++++++-- base/multi.jl | 37 ++++++++++++++++++++----------------- src/jl_uv.c | 29 +++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/base/managers.jl b/base/managers.jl index 690eeda862c1c..597dc57a71aa2 100644 --- a/base/managers.jl +++ b/base/managers.jl @@ -293,13 +293,35 @@ function connect_w2w(pid::Int, config::WorkerConfig) (s,s) end +const client_port = Ref{Cushort}(0) + +function socket_reuse_port() + s = TCPSocket() + try + client_host = Ref{Cuint}(0) + ccall(:jl_tcp_bind, Int32, + (Ptr{Void}, UInt16, UInt32, Cuint), + s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : ")) + + ccall(:jl_tcp_reuseport, Int32, (Ptr{Void}, ), s.handle) < 0 && throw(SystemError("setsockopt() SO_REUSEPORT : ")) + ccall(:jl_tcp_getsockname_v4, Int32, + (Ptr{Void}, Ref{Cuint}, Ref{Cushort}), + s.handle, client_host, client_port) < 0 && throw(SystemError("getsockname() : ")) + catch e + warn_once("Unable to reuse port : ", e) + # provide a clean new socket + return TCPSocket() + end + return s +end function connect_to_worker(host::AbstractString, port::Integer) # Connect to the loopback port if requested host has the same ipaddress as self. + s = socket_reuse_port() if host == string(LPROC.bind_addr) - s = connect("127.0.0.1", UInt16(port)) + s = connect(s, "127.0.0.1", UInt16(port)) else - s = connect(host, UInt16(port)) + s = connect(s, host, UInt16(port)) end # Avoid calling getaddrinfo if possible - involves a DNS lookup diff --git a/base/multi.jl b/base/multi.jl index f14cad932277d..3a244377ddba1 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -894,29 +894,32 @@ function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream, rr_n end # end of while catch e iderr = worker_id_from_socket(r_stream) - werr = worker_from_id(iderr) - oldstate = werr.state - set_worker_state(werr, W_TERMINATED) - - - # If error occured talking to pid 1, commit harakiri - if iderr == 1 - if isopen(w_stream) - print(STDERR, "fatal error on ", myid(), ": ") - display_error(e, catch_backtrace()) + if (iderr < 1) + print(STDERR, "Socket from unknown remote worker in worker ", myid()) + else + werr = worker_from_id(iderr) + oldstate = werr.state + set_worker_state(werr, W_TERMINATED) + + # If error occured talking to pid 1, commit harakiri + if iderr == 1 + if isopen(w_stream) + print(STDERR, "fatal error on ", myid(), ": ") + display_error(e, catch_backtrace()) + end + exit(1) end - exit(1) - end - # Will treat any exception as death of node and cleanup - # since currently we do not have a mechanism for workers to reconnect - # to each other on unhandled errors - deregister_worker(iderr) + # Will treat any exception as death of node and cleanup + # since currently we do not have a mechanism for workers to reconnect + # to each other on unhandled errors + deregister_worker(iderr) + end if isopen(r_stream) close(r_stream) end if isopen(w_stream) close(w_stream) end - if (myid() == 1) + if (myid() == 1) && (iderr > 1) if oldstate != W_TERMINATING println(STDERR, "Worker $iderr terminated.") rethrow(e) diff --git a/src/jl_uv.c b/src/jl_uv.c index 3fe88630f2e4c..3b5737ae8b9a6 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -655,7 +655,36 @@ DLLEXPORT int jl_tcp_quickack(uv_tcp_t *handle, int on) } return 0; } + +#endif + + +DLLEXPORT int jl_tcp_reuseport(uv_tcp_t *handle) +{ +#if defined(SO_REUSEPORT) + int fd = (handle)->io_watcher.fd; + int yes = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) { + return -1; + } + return 0; +#else + return -1; #endif +} + +DLLEXPORT int jl_tcp_getsockname_v4(uv_tcp_t *handle, uint32_t * ip, uint16_t * port) +{ + struct sockaddr_in name; + int len = sizeof(name); + if (uv_tcp_getsockname(handle, (struct sockaddr *)&name, &len)) { + return -1; + } + + *ip = ntohl(name.sin_addr.s_addr); + *port = ntohs(name.sin_port); + return 0; +} #ifndef _OS_WINDOWS_