From 3bdb6380250d77f966b039b922b3489c937a0b59 Mon Sep 17 00:00:00 2001 From: Pradeep Chhetri Date: Tue, 12 Jul 2016 15:24:31 +0530 Subject: [PATCH] Add ChannelOption/SO_BACKLOG to TCP server --- src/riemann/transport/tcp.clj | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/riemann/transport/tcp.clj b/src/riemann/transport/tcp.clj index a7109fd46..e1ccc37e4 100644 --- a/src/riemann/transport/tcp.clj +++ b/src/riemann/transport/tcp.clj @@ -54,7 +54,7 @@ ChannelInboundHandlerAdapter which calls (handler core stats channel-handler-context message) for each received message. - To prevent Netty outbound buffer from filling up in the case of clients not + To prevent Netty outbound buffer from filling up in the case of clients not reading ack messages, we close the channel when it becomes unwritable. Clients should then be ready to reconnect if need be as they will receive some form of exception in this case. @@ -72,7 +72,7 @@ (warn "forcefully closing connection from " (.remoteAddress channel) ". Client might be not reading acks fast enough or network is broken") (.close channel)))) - + (channelRead [^ChannelHandlerContext ctx ^Object message] (try (handler @core stats ctx message) @@ -115,6 +115,7 @@ (defrecord TCPServer [^String host ^int port + ^int so-backlog equiv ^ChannelGroup channel-group ^ChannelInitializer initializer @@ -157,6 +158,7 @@ (.channel (:channel netty-implementation)) (.option ChannelOption/SO_REUSEADDR true) (.option ChannelOption/TCP_NODELAY true) + (.option ChannelOption/SO_BACKLOG so-backlog) (.childOption ChannelOption/SO_REUSEADDR true) (.childOption ChannelOption/TCP_NODELAY true) (.childOption ChannelOption/SO_KEEPALIVE true) @@ -243,10 +245,13 @@ (gen-tcp-handler core stats channel-group tcp-handler)))) (defn tcp-server - "Create a new TCP server. Doesn't start until (service/start!). Options: + "Create a new TCP server. Doesn't start until (service/start!). + + Options: :host The host to listen on (default 127.0.0.1). :port The port to listen on. (default 5554 with TLS, or 5555 std) - :core An atom used to track the active core for this server + :core An atom used to track the active core for this server. + :so-backlog The maximum queue length for incoming tcp connections (default 50). :channel-group A global channel group used to track all connections. :initializer A ChannelInitializer for creating new pipelines. @@ -262,6 +267,7 @@ stats (metrics/rate+latency) host (get opts :host "127.0.0.1") port (get opts :port (if (:tls? opts) 5554 5555)) + so-backlog (get opts :so-backlog 50) channel-group (get opts :channel-group (channel-group (str "tcp-server " host ":" port))) @@ -285,5 +291,5 @@ ; A standard handler (initializer core stats channel-group nil)))] - (TCPServer. host port equiv channel-group initializer core stats + (TCPServer. host port so-backlog equiv channel-group initializer core stats (atom nil)))))