Skip to content

Commit

Permalink
Merge pull request riemann#706 from pradeepchhetri/fix-705
Browse files Browse the repository at this point in the history
Add ChannelOption/SO_BACKLOG to TCP server
  • Loading branch information
pyr authored Jul 28, 2016
2 parents 9d522ce + 3bdb638 commit dcd2aad
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/riemann/transport/tcp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
ChannelInboundHandlerAdapter which calls (handler core stats
channel-handler-context message) for each received message.
To prevent Netty outbound buffer from filling up in the case of clients not
To prevent Netty outbound buffer from filling up in the case of clients not
reading ack messages, we close the channel when it becomes unwritable. Clients
should then be ready to reconnect if need be as they will receive some form
of exception in this case.
Expand All @@ -72,7 +72,7 @@
(warn "forcefully closing connection from " (.remoteAddress channel)
". Client might be not reading acks fast enough or network is broken")
(.close channel))))

(channelRead [^ChannelHandlerContext ctx ^Object message]
(try
(handler @core stats ctx message)
Expand Down Expand Up @@ -115,6 +115,7 @@

(defrecord TCPServer [^String host
^int port
^int so-backlog
equiv
^ChannelGroup channel-group
^ChannelInitializer initializer
Expand Down Expand Up @@ -157,6 +158,7 @@
(.channel (:channel netty-implementation))
(.option ChannelOption/SO_REUSEADDR true)
(.option ChannelOption/TCP_NODELAY true)
(.option ChannelOption/SO_BACKLOG so-backlog)
(.childOption ChannelOption/SO_REUSEADDR true)
(.childOption ChannelOption/TCP_NODELAY true)
(.childOption ChannelOption/SO_KEEPALIVE true)
Expand Down Expand Up @@ -243,10 +245,13 @@
(gen-tcp-handler core stats channel-group tcp-handler))))

(defn tcp-server
"Create a new TCP server. Doesn't start until (service/start!). Options:
"Create a new TCP server. Doesn't start until (service/start!).
Options:
:host The host to listen on (default 127.0.0.1).
:port The port to listen on. (default 5554 with TLS, or 5555 std)
:core An atom used to track the active core for this server
:core An atom used to track the active core for this server.
:so-backlog The maximum queue length for incoming tcp connections (default 50).
:channel-group A global channel group used to track all connections.
:initializer A ChannelInitializer for creating new pipelines.
Expand All @@ -262,6 +267,7 @@
stats (metrics/rate+latency)
host (get opts :host "127.0.0.1")
port (get opts :port (if (:tls? opts) 5554 5555))
so-backlog (get opts :so-backlog 50)
channel-group (get opts :channel-group
(channel-group
(str "tcp-server " host ":" port)))
Expand All @@ -285,5 +291,5 @@
; A standard handler
(initializer core stats channel-group nil)))]

(TCPServer. host port equiv channel-group initializer core stats
(TCPServer. host port so-backlog equiv channel-group initializer core stats
(atom nil)))))

0 comments on commit dcd2aad

Please sign in to comment.