Skip to content

Commit

Permalink
Refactoring metrics-consumer to remove shell/jvm execution
Browse files Browse the repository at this point in the history
  • Loading branch information
obazoud committed Feb 16, 2017
1 parent d58124d commit bcbff68
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 48 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang
### Fixed
### Changed

## [Unreleased]

### Breaking Changes
- Refactoring metrics-consumer-lag to remove shell/jvm execution

## [0.8.1]

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

[Installation and Setup](http://sensu-plugins.io/docs/installation_instructions.html)

Kafka installation is required where checks will be made.

Kafka version: 0.8.2.x
Tested on:
* Zookeeper version: 3.4.6
* Kafka version: 0.8.2.x

Note: In addition to the standard installation requirements the installation of this gem will require compiling the nokogiri gem. Due to this you'll need certain developmemnt packages on your system. On Ubuntu systems install build-essential, libxml2-dev and zlib1g-dev. On CentOS install gcc and zlib-devel.
83 changes: 38 additions & 45 deletions bin/metrics-consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

require 'sensu-plugin/metric/cli'

require 'json'
require 'poseidon'
require 'zookeeper'

class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite
option :scheme,
description: 'Metric naming scheme, text to prepend to metric',
Expand All @@ -39,12 +43,6 @@ class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite
long: '--group NAME',
required: true

option :kafka_home,
description: 'Kafka home',
short: '-k NAME',
long: '--kafka-home NAME',
default: '/opt/kafka'

option :topic,
description: 'Comma-separated list of consumer topics',
short: '-t NAME',
Expand All @@ -62,51 +60,46 @@ class ConsumerOffsetMetrics < Sensu::Plugin::Metric::CLI::Graphite
long: '--zookeeper NAME',
default: 'localhost:2181'

# read the output of a command
# @param cmd [String] the command to read the output from
def read_lines(cmd)
IO.popen(cmd + ' 2>&1') do |child|
child.read.split("\n")
end
end
def run
z = Zookeeper.new(config[:zookeeper])

# create a hash from the output of each line of a command
# @param line [String]
# @param cols
def line_to_hash(line, *cols)
Hash[cols.zip(line.strip.split(/\s+/, cols.size))]
end
group = config[:group]
topics = kafka_topics(z, group)

critical 'Could not found topics' if topics.empty?

consumers = {}
topics.each do |topic|
consumers[topic] = []

topics_partitions(z, topic).each do |partition|
leader = leader_broker(z, topic, partition)
consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset)
logsize = consumer.next_offset

# run command and return a hash from the output
# @param cmd [String]
def run_cmd(cmd)
read_lines(cmd).drop(1).map do |line|
line_to_hash(line, :group, :topic, :pid, :offset, :logsize, :lag, :owner)
offset = consumer_offset(z, group, topic, partition)

lag = logsize - offset
consumers[topic].push(partition: partition, logsize: logsize, offset: offset, lag: lag)
end
end
end

def run
begin
kafka_run_class = "#{config[:kafka_home]}/bin/kafka-run-class.sh"
unknown "Can not find #{kafka_run_class}" unless File.exist?(kafka_run_class)

cmd = "#{kafka_run_class} kafka.tools.ConsumerOffsetChecker --group #{config[:group]} --zookeeper #{config[:zookeeper]}"
cmd += " --topic #{config[:topic]}" if config[:topic]

results = run_cmd(cmd)

[:offset, :logsize, :lag].each do |field|
sum_by_group = results.group_by { |h| h[:topic] }.map do |k, v|
Hash[k, v.inject(0) { |a, e| a + e[field].to_i }]
end
sum_by_group.delete_if { |x| config[:topic_excludes].include?(x.keys[0]) } if config[:topic_excludes]
sum_by_group.each do |x|
output "#{config[:scheme]}.#{config[:group]}.#{x.keys[0]}.#{field}", x.values[0]
end
[:offset, :logsize, :lag].each do |field|
consumers.each do |k, v|
critical "Topic #{k} has #{field} < 0 '#{v[field]}'" unless v.select { |w| w[field].to_i < 0 }.empty?
end
end

[:offset, :logsize, :lag].each do |field|
sum_by_group = consumers.map do |k, v|
Hash[k, v.inject(0) { |a, e| a + e[field].to_i }]
end
sum_by_group.each do |x|
output "#{config[:scheme]}.#{config[:group]}.#{x.keys[0]}.#{field}", x.values[0]
end
rescue => e
critical "Error: exception: #{e}"
end
ok
rescue => e
critical "Error: exception: #{e}"
end
end

0 comments on commit bcbff68

Please sign in to comment.