Skip to content

Commit

Permalink
Refactoring check-consumer-lag to remove shell/jvm execution
Browse files Browse the repository at this point in the history
  • Loading branch information
obazoud committed Feb 15, 2017
1 parent 6b148c2 commit 8a9ed21
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 68 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
language: ruby

cache:
- bundler

install:
- bundle install

rvm:
- 2.1
- 2.2.1
- 2.3.0

script:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang
### Fixed
### Changed

## [Unreleased]

### Breaking Changes
- Refactoring check-consumer-lag to remove shell/jvm execution

## [0.7.0]

### Added
Expand Down
124 changes: 58 additions & 66 deletions bin/check-consumer-lag.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,118 +27,110 @@

require 'sensu-plugin/check/cli'

require 'celluloid-io'
require 'json'
require 'poseidon'
require 'zookeeper'

module Poseidon
class Connection
TCPSocket = Celluloid::IO::TCPSocket
end
end

class ConsumerLagCheck < Sensu::Plugin::Check::CLI
option :group,
description: 'Consumer group',
short: '-g NAME',
long: '--group NAME',
required: true

option :kafka_home,
description: 'Kafka home',
short: '-k NAME',
long: '--kafka-home NAME',
default: '/opt/kafka'

option :topic_excludes,
description: 'Excludes consumer topics',
short: '-e NAME',
long: '--topic-excludes NAME',
proc: proc { |a| a.split(',') }

option :autolist,
description: 'Auto list topics',
short: '-a VALUE',
long: '--auto-list VALUE',
boolean: true,
default: true

option :zookeeper,
description: 'ZooKeeper connect string',
short: '-z NAME',
long: '--zookeeper NAME',
default: 'localhost:2181'

option :warning_over,
description: 'Warning if metric statistics is over specified value.',
description: 'Warning if lag is over specified value.',
short: '-W N',
long: '--warning-over N'

option :critical_over,
description: 'Critical if metric statistics is over specified value.',
description: 'Critical if lag is over specified value.',
short: '-C N',
long: '--critical-over N'

option :warning_under,
description: 'Warning if metric statistics is under specified value.',
description: 'Warning if lag is under specified value.',
short: '-w N',
long: '--warning-under N'

option :critical_under,
description: 'Critical if metric statistics is under specified value.',
description: 'Critical if lag is under specified value.',
short: '-c N',
long: '--critical-under N'

# read the output of a command
# @param cmd [String] the command to read the output from
def read_lines(cmd)
IO.popen(cmd + ' 2>&1') do |child|
child.read.split("\n")
end
def kafka_topics(zk, group)
zk.get_children(path: "/consumers/#{group}/owners")[:children].sort
end

# create a hash from the output of each line of a command
# @param line [String]
# @param cols
def line_to_hash(line, *cols)
Hash[cols.zip(line.strip.split(/\s+/, cols.size))]
def topics_partitions(zk, topic)
JSON.parse(zk.get(path: "/brokers/topics/#{topic}")[:data])['partitions'].keys.map(&:to_i).sort
end

# run command and return a hash from the output
# @param cmd [String]
def run_offset(cmd)
read_lines(cmd).drop(1).map do |line|
line_to_hash(line, :group, :topic, :pid, :offset, :logsize, :lag, :owner)
end
def leader_broker(zk, topic, partition)
state = zk.get(path: "/brokers/topics/#{topic}/partitions/#{partition}/state")
leader = JSON.parse(state[:data])['leader']
JSON.parse(zk.get(path: "/brokers/ids/#{leader}")[:data])
end

# run command and return a hash from the output
# @param cmd [String]
def run_topics(cmd)
topics = []
read_lines(cmd).map do |line|
if !line.include?('__consumer_offsets') && !line.include?('marked for deletion')
topics.push(line)
end
end
topics
def consumer_offset(zk, group, topic, partition)
zk.get(path: "/consumers/#{group}/offsets/#{topic}/#{partition}")[:data].to_i
end

def partition_owner(zk, group, partition)
zk.get(path: "/consumers/#{group}/offsets/#{owners}/#{partition}")[:data]
end

def run
kafka_run_class = "#{config[:kafka_home]}/bin/kafka-run-class.sh"
unknown "Can not find #{kafka_run_class}" unless File.exist?(kafka_run_class)

topics_to_read = []
if config[:autolist].to_s == 'true'
cmd_topics = "#{kafka_run_class} kafka.admin.TopicCommand --zookeeper #{config[:zookeeper]} --list"
topics_to_read = run_topics(cmd_topics)
topics_to_read.delete_if { |x| config[:topic_excludes].include?(x) } if config[:topic_excludes]
end
z = Zookeeper.new(config[:zookeeper])

group = config[:group]
topics = kafka_topics(z, group)

critical 'Could not found topics' if topics.empty?

consumers = {}
topics.each do |topic|
consumers[topic] = {}

cmd_offset = "#{kafka_run_class} kafka.tools.ConsumerOffsetChecker --group #{config[:group]} --zookeeper #{config[:zookeeper]}"
cmd_offset += " --topic #{topics_to_read.join(',')}" unless topics_to_read.empty?
topics_partitions(z, topic).each do |partition|
owner = partition_owner(z, group, partition)

topics = run_offset(cmd_offset).group_by { |h| h[:topic] }
leader = leader_broker(z, topic, partition)
consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset)
logsize = consumer.next_offset

critical 'Could not found topics/partitions' if topics.empty?
offset = consumer_offset(z, group, topic, partition)

lag = logsize - offset

consumers[topic][:partition] = partition
consumers[topic][:logsize] = logsize
consumers[topic][:offset] = offset
consumers[topic][:lag] = lag
consumers[topic][:owner] = owner
end
end

[:offset, :logsize, :lag].each do |field|
topics.map do |k, v|
consumers.map do |k, v|
critical "Topic #{k} has partitions with #{field} < 0" unless v.select { |w| w[field].to_i < 0 }.empty?
end
end

topics.map do |k, v|
consumers.map do |k, v|
critical "Topic #{k} has partitions with no owner" unless v.select { |w| w[:owner] == 'none' }.empty?
end

Expand All @@ -165,7 +157,7 @@ def run
end
when :under
if min_lag < threshold.to_i
msg = "Topics `#{min_topics}` for the group `#{config[:group]}` lag: #{min_lag} (<= #{threshold})"
msg = "Topics `#{min_topics}` for the group `#{config[:group]}` lag: #{min_lag} (<= #{threshold})"
send severity, msg
end
end
Expand Down
2 changes: 2 additions & 0 deletions sensu-plugins-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Gem::Specification.new do |s|
s.test_files = s.files.grep(%r{^(test|spec|features)/})
s.version = SensuPluginsKafka::Version::VER_STRING

s.add_runtime_dependency 'celluloid-io', '~> 0.17.3 '
s.add_runtime_dependency 'poseidon', '0.0.5'
s.add_runtime_dependency 'sensu-plugin', '~> 1.3'
s.add_runtime_dependency 'zookeeper', '~> 1.4.11'

Expand Down

0 comments on commit 8a9ed21

Please sign in to comment.