From bcbff68d9fe880083b00a4cb9b887abafecd0b14 Mon Sep 17 00:00:00 2001 From: Olivier Bazoud Date: Thu, 16 Feb 2017 13:53:15 +0100 Subject: [PATCH] Refactoring metrics-consumer to remove shell/jvm execution --- CHANGELOG.md | 5 +++ README.md | 6 +-- bin/metrics-consumer.rb | 83 +++++++++++++++++++---------------------- 3 files changed, 46 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bbce0f..a9899dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang ### Fixed ### Changed +## [Unreleased] + +### Breaking Changes +- Refactoring metrics-consumer-lag to remove shell/jvm execution + ## [0.8.1] ### Fixed diff --git a/README.md b/README.md index 899cc2c..7d8698e 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,8 @@ [Installation and Setup](http://sensu-plugins.io/docs/installation_instructions.html) -Kafka installation is required where checks will be made. - -Kafka version: 0.8.2.x +Tested on: +* Zookeeper version: 3.4.6 +* Kafka version: 0.8.2.x Note: In addition to the standard installation requirements the installation of this gem will require compiling the nokogiri gem. Due to this you'll need certain developmemnt packages on your system. On Ubuntu systems install build-essential, libxml2-dev and zlib1g-dev. On CentOS install gcc and zlib-devel. diff --git a/bin/metrics-consumer.rb b/bin/metrics-consumer.rb index d9a693d..5ba1881 100755 --- a/bin/metrics-consumer.rb +++ b/bin/metrics-consumer.rb @@ -26,6 +26,10 @@ require 'sensu-plugin/metric/cli' +require 'json' +require 'poseidon' +require 'zookeeper' + class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite option :scheme, description: 'Metric naming scheme, text to prepend to metric', @@ -39,12 +43,6 @@ class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite long: '--group NAME', required: true - option :kafka_home, - description: 'Kafka home', - short: '-k NAME', - long: '--kafka-home NAME', - default: '/opt/kafka' - option :topic, description: 'Comma-separated list of consumer topics', short: '-t NAME', @@ -62,51 +60,46 @@ class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite long: '--zookeeper NAME', default: 'localhost:2181' - # read the output of a command - # @param cmd [String] the command to read the output from - def read_lines(cmd) - IO.popen(cmd + ' 2>&1') do |child| - child.read.split("\n") - end - end + def run + z = Zookeeper.new(config[:zookeeper]) - # create a hash from the output of each line of a command - # @param line [String] - # @param cols - def line_to_hash(line, *cols) - Hash[cols.zip(line.strip.split(/\s+/, cols.size))] - end + group = config[:group] + topics = kafka_topics(z, group) + + critical 'Could not found topics' if topics.empty? + + consumers = {} + topics.each do |topic| + consumers[topic] = [] + + topics_partitions(z, topic).each do |partition| + leader = leader_broker(z, topic, partition) + consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset) + logsize = consumer.next_offset - # run command and return a hash from the output - # @param cmd [String] - def run_cmd(cmd) - read_lines(cmd).drop(1).map do |line| - line_to_hash(line, :group, :topic, :pid, :offset, :logsize, :lag, :owner) + offset = consumer_offset(z, group, topic, partition) + + lag = logsize - offset + consumers[topic].push(partition: partition, logsize: logsize, offset: offset, lag: lag) + end end - end - def run - begin - kafka_run_class = "#{config[:kafka_home]}/bin/kafka-run-class.sh" - unknown "Can not find #{kafka_run_class}" unless File.exist?(kafka_run_class) - - cmd = "#{kafka_run_class} kafka.tools.ConsumerOffsetChecker --group #{config[:group]} --zookeeper #{config[:zookeeper]}" - cmd += " --topic #{config[:topic]}" if config[:topic] - - results = run_cmd(cmd) - - [:offset, :logsize, :lag].each do |field| - sum_by_group = results.group_by { |h| h[:topic] }.map do |k, v| - Hash[k, v.inject(0) { |a, e| a + e[field].to_i }] - end - sum_by_group.delete_if { |x| config[:topic_excludes].include?(x.keys[0]) } if config[:topic_excludes] - sum_by_group.each do |x| - output "#{config[:scheme]}.#{config[:group]}.#{x.keys[0]}.#{field}", x.values[0] - end + [:offset, :logsize, :lag].each do |field| + consumers.each do |k, v| + critical "Topic #{k} has #{field} < 0 '#{v[field]}'" unless v.select { |w| w[field].to_i < 0 }.empty? + end + end + + [:offset, :logsize, :lag].each do |field| + sum_by_group = consumers.map do |k, v| + Hash[k, v.inject(0) { |a, e| a + e[field].to_i }] + end + sum_by_group.each do |x| + output "#{config[:scheme]}.#{config[:group]}.#{x.keys[0]}.#{field}", x.values[0] end - rescue => e - critical "Error: exception: #{e}" end ok + rescue => e + critical "Error: exception: #{e}" end end