forked from statsd/statsd
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 122964c
Showing
4 changed files
with
269 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
var fs = require('fs') | ||
, sys = require('sys') | ||
|
||
var Configurator = function (file) { | ||
|
||
var self = this; | ||
var config = {}; | ||
var oldConfig = {}; | ||
|
||
this.updateConfig = function () { | ||
sys.log('reading config file: ' + file); | ||
|
||
fs.readFile(file, function (err, data) { | ||
if (err) { throw err; } | ||
old_config = self.config; | ||
|
||
self.config = process.compile('config = ' + data, file); | ||
self.emit('configChanged', self.config); | ||
}); | ||
}; | ||
|
||
this.updateConfig(); | ||
|
||
fs.watchFile(file, function (curr, prev) { | ||
if (curr.ino != prev.ino) { self.updateConfig(); } | ||
}); | ||
}; | ||
|
||
sys.inherits(Configurator, process.EventEmitter); | ||
|
||
exports.Configurator = Configurator; | ||
|
||
exports.configFile = function(file, callbackFunc) { | ||
var config = new Configurator(file); | ||
config.on('configChanged', function() { | ||
callbackFunc(config.config, config.oldConfig); | ||
}); | ||
}; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
graphitePort: 2003 | ||
, graphiteHost: "graphite.host.com" | ||
, port: 8125 | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
<?php | ||
|
||
/** | ||
* Sends statistics to the stats daemon over UDP | ||
* | ||
**/ | ||
|
||
class StatsD { | ||
|
||
/** | ||
* Log timing information | ||
* | ||
* @param string $stats The metric to in log timing info for. | ||
* @param float $time The ellapsed time (ms) to log | ||
* @param float|1 $sampleRate the rate (0-1) for sampling. | ||
**/ | ||
public static function timing($stat, $time, $sampleRate=1) { | ||
StatsD::send(array($stat => "$time|ms"), $sampleRate); | ||
} | ||
|
||
/** | ||
* Increments one or more stats counters | ||
* | ||
* @param string|array $stats The metric(s) to increment. | ||
* @param float|1 $sampleRate the rate (0-1) for sampling. | ||
* @return boolean | ||
**/ | ||
public static function increment($stats, $sampleRate=1) { | ||
StatsD::updateStats($stats, 1, $sampleRate); | ||
} | ||
|
||
/** | ||
* Decrements one or more stats counters. | ||
* | ||
* @param string|array $stats The metric(s) to decrement. | ||
* @param float|1 $sampleRate the rate (0-1) for sampling. | ||
* @return boolean | ||
**/ | ||
public static function decrement($stats, $sampleRate=1) { | ||
StatsD::updateStats($stats, -1, $sampleRate); | ||
} | ||
|
||
/** | ||
* Updates one or more stats counters by arbitrary amounts. | ||
* | ||
* @param string|array $stats The metric(s) to update. Should be either a string or array of metrics. | ||
* @param int|1 $delta The amount to increment/decrement each metric by. | ||
* @param float|1 $sampleRate the rate (0-1) for sampling. | ||
* @return boolean | ||
**/ | ||
public static function updateStats($stats, $delta=1, $sampleRate=1) { | ||
if (!is_array($stats)) { $stats = array($stats); } | ||
$data = array(); | ||
foreach($stats as $stat) { | ||
$data[$stat] = "$delta|c"; | ||
} | ||
|
||
StatsD::send($data, $sampleRate); | ||
} | ||
|
||
/* | ||
* Squirt the metrics over UDP | ||
**/ | ||
public static function send($data, $sampleRate=1) { | ||
$config = Config::getInstance(); | ||
if (! $config->isEnabled("statsd")) { return; } | ||
|
||
// sampling | ||
$sampledData = array(); | ||
|
||
if ($sampleRate < 1) { | ||
foreach ($data as $stat => $value) { | ||
if ((mt_rand() / mt_getrandmax()) <= $sampleRate) { | ||
$sampledData[$stat] = "$value|@$sampleRate"; | ||
} | ||
} | ||
} else { | ||
$sampledData = $data; | ||
} | ||
|
||
if (empty($sampledData)) { return; } | ||
|
||
// Wrap this in a try/catch - failures in any of this should be silently ignored | ||
try { | ||
$host = $config->getConfig("statsd.host"); | ||
$port = $config->getConfig("statsd.port"); | ||
$fp = fsockopen("udp://$host", $port, $errno, $errstr); | ||
if (! $fp) { return; } | ||
foreach ($sampledData as $stat => $value) { | ||
fwrite($fp, "$stat:$value"); | ||
} | ||
fclose($fp); | ||
} catch (Exception $e) { | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
var dgram = require('dgram') | ||
, sys = require('sys') | ||
, net = require('net') | ||
, config = require('./config') | ||
|
||
var counters = {}; | ||
var timers = {}; | ||
var debugInt, flushInt, server; | ||
|
||
config.configFile(process.argv[2], function (config, oldConfig) { | ||
if (! config.debug && debugInt) { | ||
clearInterval(debugInt); | ||
debugInt = false; | ||
} | ||
|
||
if (config.debug) { | ||
if (debugInt !== undefined) { clearInterval(debugInt); } | ||
debugInt = setInterval(function () { | ||
sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers)); | ||
}, config.debugInterval || 10000); | ||
} | ||
|
||
if (server === undefined) { | ||
server = dgram.createSocket('udp4', function (msg, rinfo) { | ||
if (config.dumpMessages) { sys.log(msg.toString()); } | ||
var bits = msg.toString().split(':'); | ||
var key = bits.shift() | ||
.replace(/\s+/g, '_') | ||
.replace(/\//g, '-') | ||
.replace(/[^a-zA-Z_\-0-9\.]/g, ''); | ||
|
||
if (bits.length == 0) { | ||
bits.push("1"); | ||
} | ||
|
||
for (var i = 0; i < bits.length; i++) { | ||
var sampleRate = 1; | ||
var fields = bits[i].split("|"); | ||
if (fields[1] == "ms") { | ||
if (! timers[key]) { | ||
timers[key] = []; | ||
} | ||
timers[key].push(Number(fields[0] || 0)); | ||
} else { | ||
if (fields[2] && fields[2].match(/^@([\d\.]+)/)) { | ||
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]); | ||
} | ||
if (! counters[key]) { | ||
counters[key] = 0; | ||
} | ||
counters[key] += Number(fields[0] || 1) * (1 / sampleRate); | ||
} | ||
} | ||
}); | ||
|
||
server.bind(config.port || 8125); | ||
|
||
var flushInterval = Number(config.flushInterval || 10000); | ||
|
||
flushInt = setInterval(function () { | ||
var statString = ''; | ||
var ts = Math.round(new Date().getTime() / 1000); | ||
var numStats = 0; | ||
var key; | ||
|
||
for (key in counters) { | ||
var value = counters[key] / (flushInterval / 1000); | ||
var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n"; | ||
statString += message; | ||
counters[key] = 0; | ||
|
||
numStats += 1; | ||
} | ||
|
||
for (key in timers) { | ||
if (timers[key].length > 0) { | ||
var pctThreshold = config.percentThreshold || 90; | ||
var values = timers[key].sort(function (a,b) { return a-b; }); | ||
var count = values.length; | ||
var min = values[0]; | ||
var max = values[count - 1]; | ||
|
||
var mean = min; | ||
var maxAtThreshold = max; | ||
|
||
if (count > 1) { | ||
var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count); | ||
var numInThreshold = count - thresholdIndex; | ||
values = values.slice(0, numInThreshold); | ||
maxAtThreshold = values[numInThreshold - 1]; | ||
|
||
// average the remaining timings | ||
var sum = 0; | ||
for (var i = 0; i < numInThreshold; i++) { | ||
sum += values[i]; | ||
} | ||
|
||
mean = sum / numInThreshold; | ||
} | ||
|
||
timers[key] = []; | ||
|
||
var message = ""; | ||
message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n"; | ||
message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; | ||
message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n"; | ||
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; | ||
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; | ||
statString += message; | ||
|
||
numStats += 1; | ||
} | ||
} | ||
|
||
statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; | ||
|
||
var graphite = net.createConnection(config.graphitePort, config.graphiteHost); | ||
|
||
graphite.on('connect', function() { | ||
this.write(statString); | ||
this.end(); | ||
}); | ||
|
||
}, flushInterval); | ||
} | ||
|
||
}); | ||
|