Skip to content

Commit

Permalink
add zeromq input
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Aug 7, 2012
1 parent 61e0898 commit 887a043
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 4 deletions.
10 changes: 9 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
statsdserver (0.3)
amqp
bundler
em-zeromq
eventmachine
parseconfig
sysexits
Expand All @@ -20,7 +21,14 @@ GEM
amq-protocol (>= 0.9.4)
eventmachine
diff-lcs (1.1.3)
eventmachine (0.12.10)
em-zeromq (0.3.0)
eventmachine (= 1.0.0.beta.4)
ffi (>= 1.0.0)
ffi-rzmq (= 0.9.3)
eventmachine (1.0.0.beta.4)
ffi (1.1.4)
ffi-rzmq (0.9.3)
ffi
parseconfig (1.0.2)
rspec (2.11.0)
rspec-core (~> 2.11.0)
Expand Down
5 changes: 4 additions & 1 deletion etc/statsd.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
daemonize = false
inputs = udp
inputs = udp, zeromq
outputs = stdout, tcp
flush_interval = 5
prefix = stats
Expand All @@ -9,6 +9,9 @@ prefix = stats
bind = 127.0.0.1
port = 8125

[input:zeromq]
bind = tcp://127.0.0.1:8125

[output:tcp]
host = 127.0.0.1
port = 2003
14 changes: 12 additions & 2 deletions lib/statsdserver.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "logger"
require "statsdserver/input/udp"
require "statsdserver/input/zeromq"
require "statsdserver/stats"

# Hack because the latest amqp gem uses String#bytesize, and not everyone
Expand Down Expand Up @@ -62,22 +63,31 @@ def run
s.logger = @logger
s.stats = @stats
end # EM.open_datagram_socket
when "zeromq"
s = Input::ZeroMQ.new
s.logger = @logger
s.stats = @stats
$ctx = EM::ZeroMQ::Context.new(1)
sock = $ctx.socket(ZMQ::UPSTREAM, s)
sock.setsockopt(ZMQ::HWM, 100)
sock.bind(config["bind"])
else
@logger.fatal("unknown input #{input.inspect}")
exit EX_DATAERR
end # case input
end # @inputs.each

# start flusher
puts "starting flusher"
EM.add_periodic_timer(@opts[:flush_interval]) do
EM.defer do
#EM.defer do
begin
flush
rescue => e
@logger.warn("trouble flushing: #{$!}")
@logger.debug(e.backtrace.join("\n"))
end
end # EM.defer
#end # EM.defer
end # EM.add_periodic_timer
#EM.stop_event_loop
end # def run
Expand Down
53 changes: 53 additions & 0 deletions lib/statsdserver/input/zeromq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require "em-zeromq"
require "logger"
require "statsdserver/proto/v1"

class StatsdServer
class Input
class ZeroMQ
attr_accessor :logger,
:stats

public
def initialize
@logger = Logger.new(STDOUT)
end

public
def on_readable(socket, parts)
EM.defer do
parts.each do |part|
str = part.copy_out_string
receive_data(str)
end
end
end

public
def receive_data(packet)
raise "@stats must be set" unless @stats

sep = packet.index(";")
if sep.nil?
@logger.warn("received unversioned update: #{packet}")
return
end

proto_ver = packet[0 .. sep - 1]
payload = packet[sep + 1 .. -1]
case proto_ver
when "1"
begin
StatsdServer::Proto::V1.parse(payload, @stats)
rescue StatsdServer::Proto::ParseError => e
@logger.warn(e.message)
end
else
@logger.warn("unknown protocol version #{proto_ver} in update #{packet}")
return
end

end # def receive_data
end # class Ucp
end # class Input
end # class StatsdServer
1 change: 1 addition & 0 deletions statsd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency("eventmachine")
spec.add_runtime_dependency("parseconfig")
spec.add_runtime_dependency("sysexits")
spec.add_runtime_dependency("em-zeromq")

spec.files = files
spec.require_paths << "lib"
Expand Down

0 comments on commit 887a043

Please sign in to comment.