Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pystatsd/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .ganglia import Ganglia
from .gmetric import Gmetric
from .graphite import Graphite
from .console import Console

def create_instance(transport, options):
if transport == 'graphite':
return Graphite(options)
elif transport == 'ganglia':
return Ganglia(options)
elif transport == 'ganglia-gmetric':
return Gmetric(options)
elif transport == 'console':
return Console(options)
else:
return None
22 changes: 22 additions & 0 deletions pystatsd/backends/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

log = logging.getLogger(__name__)

class Console(object):
def __init__(self, options={}):
print("Console started")

def init(self, cfg):
self.debug = cfg.get('debug')
self.flush_interval = cfg.get('flush_interval')

def flush(self, timestamp, metrics):
for k, v in metrics['counters'].items():
print("%s => count=%s" % (k, v))

for k, v in metrics['gauges'].items():
print("%s => value=%s" % (k, v))

for k, v in metrics['timers'].items():
print("%s => lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s"
% (k, v['min'], v['mean'], v['max'], v['pct_threshold'], v['max_threshold'], v['count']))
46 changes: 46 additions & 0 deletions pystatsd/backends/ganglia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging

from .. import gmetric

log = logging.getLogger(__name__)

class Ganglia(object):
def __init__(self, options):
self.host = options.get('ganglia_host', 'localhost')
self.port = options.get('ganglia_port', 8649)
self.protocol = options.get('ganglia_protocol', 'udp')
self.spoof_host = options.get('ganglia_spoof_host', 'statsd:statsd')

def init(self, options):
self.debug = options.get('debug')
self.flush_interval = options.get('flush_interval')
self.dmax = int(self.flush_interval * 1.2)

def flush(self, timestamp, metrics):
g = gmetric.Gmetric(self.host, self.port, self.protocol)

for k, v in metrics['counters'].items():
# We put counters in _counters group. Underscore is to make sure counters show up
# first in the GUI. Change below if you disagree
g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.spoof_host)

for k, v in metrics['gauges'].items():
g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.spoof_host)

for k, v in metrics['timers'].items():
# We are gonna convert all times into seconds, then let rrdtool
# add proper SI unit. This avoids things like 3521 k ms which
# is 3.521 seconds. What group should these metrics be in. For the
# time being we'll set it to the name of the key
group = k
g.send(k + "_min", v['min'] / 1000, "double", "seconds", "both", 60,
self.dmax, group, self.spoof_host)
g.send(k + "_mean", v['mean'] / 1000, "double", "seconds", "both", 60,
self.dmax, group, self.spoof_host)
g.send(k + "_max", v['max'] / 1000, "double", "seconds", "both", 60,
self.dmax, group, self.spoof_host)
g.send(k + "_count", v['count'], "double", "count", "both", 60, self.dmax,
group, self.spoof_host)
g.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000,
"double", "seconds", "both", 60, self.dmax, group,
self.spoof_host)
35 changes: 35 additions & 0 deletions pystatsd/backends/gmetric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

from subprocess import call
from .ganglia import Ganglia

log = logging.getLogger(__name__)

class Gmetric(object):
def __init__(self, options):
self.gmetric_exec = options.get('gmetric_exec', '/usr/bin/gmetric')
self.gmetric_options = options.get('gmetric_options', '-d')

def init(self, options):
self.debug = options.get('debug')
self.flush_interval = options.get('flush_interval')

def flush(self, timestamp, metrics):
for k, v in metrics['counters'].items():
self.send(k, v, "_counters", "count")

for k, v in metrics['gauges'].items():
self.send(k, v, "_gauges", "gauge")

for k, v in metrics['timers'].items():
# We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like
# 3521 k ms which is 3.521 seconds
group = k
self.send(k + "_mean", v['mean'] / 1000, group, "seconds")
self.send(k + "_min", v['min'] / 1000 , group, "seconds")
self.send(k + "_max", v['max'] / 1000, group, "seconds")
self.send(k + "_count", v['count'] , group, "count")
self.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000, group, "seconds")

def send(self, k, v, group, units):
call([self.gmetric_exec, self.gmetric_options, "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v) ])
64 changes: 64 additions & 0 deletions pystatsd/backends/graphite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import socket
import logging


TIMER_MSG = '''%(prefix)s.%(key)s.lower %(min)s %(ts)s
%(prefix)s.%(key)s.count %(count)s %(ts)s
%(prefix)s.%(key)s.mean %(mean)s %(ts)s
%(prefix)s.%(key)s.upper %(max)s %(ts)s
%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s
'''

log = logging.getLogger(__name__)

class Graphite(object):
def __init__(self, options={}):
self.host = options.get('graphite_host', 'localhost')
self.port = options.get('graphite_port', 2003)
self.counters_prefix = options.get('counters_prefix', 'stats')
self.timers_prefix = options.get('timers_prefix', 'stats.timers')
self.global_prefix = options.get('global_prefix', None)

def init(self, cfg):
self.debug = cfg.get('debug')
self.flush_interval = cfg.get('flush_interval')

def flush(self, timestamp, metrics):
stat_string = ''
stats = 0

for k, v in metrics['counters'].items():
msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp)
stat_string += msg
stats += 1

for k, v in metrics['gauges'].items():
msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp)
stat_string += msg
stats += 1

for k, v in metrics['timers'].items():
v.update({'prefix': self.timers_prefix, 'key': k, 'ts': timestamp})
stat_string += TIMER_MSG % v
stats += 1

stat_string += "statsd.numStats %s %d\n" % (stats, timestamp)
self._send_metrics(stat_string)

def _send_metrics(self, stat_string):
# Prepend stats with Hosted Graphite API key if necessary
if self.global_prefix:
stat_string = '\n'.join([
'%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1]
])

graphite = socket.socket()

try:
graphite.connect((self.host, self.port))
graphite.sendall(bytes(bytearray(stat_string, "utf-8")))
graphite.close()
except socket.error as e:
log.error("Error communicating with Graphite: %s" % e)
if self.debug:
print("Error communicating with Graphite: %s" % e)
Loading