diff --git a/config.js b/config.js new file mode 100644 index 00000000..a94ad472 --- /dev/null +++ b/config.js @@ -0,0 +1,39 @@ +var fs = require('fs') + , sys = require('sys') + +var Configurator = function (file) { + + var self = this; + var config = {}; + var oldConfig = {}; + + this.updateConfig = function () { + sys.log('reading config file: ' + file); + + fs.readFile(file, function (err, data) { + if (err) { throw err; } + old_config = self.config; + + self.config = process.compile('config = ' + data, file); + self.emit('configChanged', self.config); + }); + }; + + this.updateConfig(); + + fs.watchFile(file, function (curr, prev) { + if (curr.ino != prev.ino) { self.updateConfig(); } + }); +}; + +sys.inherits(Configurator, process.EventEmitter); + +exports.Configurator = Configurator; + +exports.configFile = function(file, callbackFunc) { + var config = new Configurator(file); + config.on('configChanged', function() { + callbackFunc(config.config, config.oldConfig); + }); +}; + diff --git a/exampleConfig.js b/exampleConfig.js new file mode 100644 index 00000000..7c66c7cc --- /dev/null +++ b/exampleConfig.js @@ -0,0 +1,6 @@ +{ + graphitePort: 2003 +, graphiteHost: "graphite.host.com" +, port: 8125 +} + diff --git a/php-example.php b/php-example.php new file mode 100644 index 00000000..9287d980 --- /dev/null +++ b/php-example.php @@ -0,0 +1,96 @@ + "$time|ms"), $sampleRate); + } + + /** + * Increments one or more stats counters + * + * @param string|array $stats The metric(s) to increment. + * @param float|1 $sampleRate the rate (0-1) for sampling. + * @return boolean + **/ + public static function increment($stats, $sampleRate=1) { + StatsD::updateStats($stats, 1, $sampleRate); + } + + /** + * Decrements one or more stats counters. + * + * @param string|array $stats The metric(s) to decrement. + * @param float|1 $sampleRate the rate (0-1) for sampling. + * @return boolean + **/ + public static function decrement($stats, $sampleRate=1) { + StatsD::updateStats($stats, -1, $sampleRate); + } + + /** + * Updates one or more stats counters by arbitrary amounts. + * + * @param string|array $stats The metric(s) to update. Should be either a string or array of metrics. + * @param int|1 $delta The amount to increment/decrement each metric by. + * @param float|1 $sampleRate the rate (0-1) for sampling. + * @return boolean + **/ + public static function updateStats($stats, $delta=1, $sampleRate=1) { + if (!is_array($stats)) { $stats = array($stats); } + $data = array(); + foreach($stats as $stat) { + $data[$stat] = "$delta|c"; + } + + StatsD::send($data, $sampleRate); + } + + /* + * Squirt the metrics over UDP + **/ + public static function send($data, $sampleRate=1) { + $config = Config::getInstance(); + if (! $config->isEnabled("statsd")) { return; } + + // sampling + $sampledData = array(); + + if ($sampleRate < 1) { + foreach ($data as $stat => $value) { + if ((mt_rand() / mt_getrandmax()) <= $sampleRate) { + $sampledData[$stat] = "$value|@$sampleRate"; + } + } + } else { + $sampledData = $data; + } + + if (empty($sampledData)) { return; } + + // Wrap this in a try/catch - failures in any of this should be silently ignored + try { + $host = $config->getConfig("statsd.host"); + $port = $config->getConfig("statsd.port"); + $fp = fsockopen("udp://$host", $port, $errno, $errstr); + if (! $fp) { return; } + foreach ($sampledData as $stat => $value) { + fwrite($fp, "$stat:$value"); + } + fclose($fp); + } catch (Exception $e) { + } + } +} diff --git a/stats.js b/stats.js new file mode 100644 index 00000000..de1f78e2 --- /dev/null +++ b/stats.js @@ -0,0 +1,128 @@ +var dgram = require('dgram') + , sys = require('sys') + , net = require('net') + , config = require('./config') + +var counters = {}; +var timers = {}; +var debugInt, flushInt, server; + +config.configFile(process.argv[2], function (config, oldConfig) { + if (! config.debug && debugInt) { + clearInterval(debugInt); + debugInt = false; + } + + if (config.debug) { + if (debugInt !== undefined) { clearInterval(debugInt); } + debugInt = setInterval(function () { + sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers)); + }, config.debugInterval || 10000); + } + + if (server === undefined) { + server = dgram.createSocket('udp4', function (msg, rinfo) { + if (config.dumpMessages) { sys.log(msg.toString()); } + var bits = msg.toString().split(':'); + var key = bits.shift() + .replace(/\s+/g, '_') + .replace(/\//g, '-') + .replace(/[^a-zA-Z_\-0-9\.]/g, ''); + + if (bits.length == 0) { + bits.push("1"); + } + + for (var i = 0; i < bits.length; i++) { + var sampleRate = 1; + var fields = bits[i].split("|"); + if (fields[1] == "ms") { + if (! timers[key]) { + timers[key] = []; + } + timers[key].push(Number(fields[0] || 0)); + } else { + if (fields[2] && fields[2].match(/^@([\d\.]+)/)) { + sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]); + } + if (! counters[key]) { + counters[key] = 0; + } + counters[key] += Number(fields[0] || 1) * (1 / sampleRate); + } + } + }); + + server.bind(config.port || 8125); + + var flushInterval = Number(config.flushInterval || 10000); + + flushInt = setInterval(function () { + var statString = ''; + var ts = Math.round(new Date().getTime() / 1000); + var numStats = 0; + var key; + + for (key in counters) { + var value = counters[key] / (flushInterval / 1000); + var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n"; + statString += message; + counters[key] = 0; + + numStats += 1; + } + + for (key in timers) { + if (timers[key].length > 0) { + var pctThreshold = config.percentThreshold || 90; + var values = timers[key].sort(function (a,b) { return a-b; }); + var count = values.length; + var min = values[0]; + var max = values[count - 1]; + + var mean = min; + var maxAtThreshold = max; + + if (count > 1) { + var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count); + var numInThreshold = count - thresholdIndex; + values = values.slice(0, numInThreshold); + maxAtThreshold = values[numInThreshold - 1]; + + // average the remaining timings + var sum = 0; + for (var i = 0; i < numInThreshold; i++) { + sum += values[i]; + } + + mean = sum / numInThreshold; + } + + timers[key] = []; + + var message = ""; + message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; + message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; + statString += message; + + numStats += 1; + } + } + + statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; + + var graphite = net.createConnection(config.graphitePort, config.graphiteHost); + + graphite.on('connect', function() { + this.write(statString); + this.end(); + }); + + }, flushInterval); + } + +}); +