From 8a9ed21ff4cc59853e60e8d6c97d51360288decc Mon Sep 17 00:00:00 2001 From: Olivier Bazoud Date: Wed, 15 Feb 2017 23:38:41 +0100 Subject: [PATCH] Refactoring check-consumer-lag to remove shell/jvm execution --- .travis.yml | 5 +- CHANGELOG.md | 5 ++ bin/check-consumer-lag.rb | 124 +++++++++++++++++------------------- sensu-plugins-kafka.gemspec | 2 + 4 files changed, 68 insertions(+), 68 deletions(-) diff --git a/.travis.yml b/.travis.yml index bc5c95e..9880b1a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,12 @@ language: ruby + cache: - bundler + install: - bundle install + rvm: - - 2.1 - - 2.2.1 - 2.3.0 script: diff --git a/CHANGELOG.md b/CHANGELOG.md index fdf3a6e..e152a60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang ### Fixed ### Changed +## [Unreleased] + +### Breaking Changes +- Refactoring check-consumer-lag to remove shell/jvm execution + ## [0.7.0] ### Added diff --git a/bin/check-consumer-lag.rb b/bin/check-consumer-lag.rb index 5b74926..e0518e3 100755 --- a/bin/check-consumer-lag.rb +++ b/bin/check-consumer-lag.rb @@ -27,6 +27,17 @@ require 'sensu-plugin/check/cli' +require 'celluloid-io' +require 'json' +require 'poseidon' +require 'zookeeper' + +module Poseidon + class Connection + TCPSocket = Celluloid::IO::TCPSocket + end +end + class ConsumerLagCheck < Sensu::Plugin::Check::CLI option :group, description: 'Consumer group', @@ -34,25 +45,6 @@ class ConsumerLagCheck < Sensu::Plugin::Check::CLI long: '--group NAME', required: true - option :kafka_home, - description: 'Kafka home', - short: '-k NAME', - long: '--kafka-home NAME', - default: '/opt/kafka' - - option :topic_excludes, - description: 'Excludes consumer topics', - short: '-e NAME', - long: '--topic-excludes NAME', - proc: proc { |a| a.split(',') } - - option :autolist, - description: 'Auto list topics', - short: '-a VALUE', - long: '--auto-list VALUE', - boolean: true, - default: true - option :zookeeper, description: 'ZooKeeper connect string', short: '-z NAME', @@ -60,85 +52,85 @@ class ConsumerLagCheck < Sensu::Plugin::Check::CLI default: 'localhost:2181' option :warning_over, - description: 'Warning if metric statistics is over specified value.', + description: 'Warning if lag is over specified value.', short: '-W N', long: '--warning-over N' option :critical_over, - description: 'Critical if metric statistics is over specified value.', + description: 'Critical if lag is over specified value.', short: '-C N', long: '--critical-over N' option :warning_under, - description: 'Warning if metric statistics is under specified value.', + description: 'Warning if lag is under specified value.', short: '-w N', long: '--warning-under N' option :critical_under, - description: 'Critical if metric statistics is under specified value.', + description: 'Critical if lag is under specified value.', short: '-c N', long: '--critical-under N' - # read the output of a command - # @param cmd [String] the command to read the output from - def read_lines(cmd) - IO.popen(cmd + ' 2>&1') do |child| - child.read.split("\n") - end + def kafka_topics(zk, group) + zk.get_children(path: "/consumers/#{group}/owners")[:children].sort end - # create a hash from the output of each line of a command - # @param line [String] - # @param cols - def line_to_hash(line, *cols) - Hash[cols.zip(line.strip.split(/\s+/, cols.size))] + def topics_partitions(zk, topic) + JSON.parse(zk.get(path: "/brokers/topics/#{topic}")[:data])['partitions'].keys.map(&:to_i).sort end - # run command and return a hash from the output - # @param cmd [String] - def run_offset(cmd) - read_lines(cmd).drop(1).map do |line| - line_to_hash(line, :group, :topic, :pid, :offset, :logsize, :lag, :owner) - end + def leader_broker(zk, topic, partition) + state = zk.get(path: "/brokers/topics/#{topic}/partitions/#{partition}/state") + leader = JSON.parse(state[:data])['leader'] + JSON.parse(zk.get(path: "/brokers/ids/#{leader}")[:data]) end - # run command and return a hash from the output - # @param cmd [String] - def run_topics(cmd) - topics = [] - read_lines(cmd).map do |line| - if !line.include?('__consumer_offsets') && !line.include?('marked for deletion') - topics.push(line) - end - end - topics + def consumer_offset(zk, group, topic, partition) + zk.get(path: "/consumers/#{group}/offsets/#{topic}/#{partition}")[:data].to_i + end + + def partition_owner(zk, group, partition) + zk.get(path: "/consumers/#{group}/offsets/#{owners}/#{partition}")[:data] end def run - kafka_run_class = "#{config[:kafka_home]}/bin/kafka-run-class.sh" - unknown "Can not find #{kafka_run_class}" unless File.exist?(kafka_run_class) - - topics_to_read = [] - if config[:autolist].to_s == 'true' - cmd_topics = "#{kafka_run_class} kafka.admin.TopicCommand --zookeeper #{config[:zookeeper]} --list" - topics_to_read = run_topics(cmd_topics) - topics_to_read.delete_if { |x| config[:topic_excludes].include?(x) } if config[:topic_excludes] - end + z = Zookeeper.new(config[:zookeeper]) + + group = config[:group] + topics = kafka_topics(z, group) + + critical 'Could not found topics' if topics.empty? + + consumers = {} + topics.each do |topic| + consumers[topic] = {} - cmd_offset = "#{kafka_run_class} kafka.tools.ConsumerOffsetChecker --group #{config[:group]} --zookeeper #{config[:zookeeper]}" - cmd_offset += " --topic #{topics_to_read.join(',')}" unless topics_to_read.empty? + topics_partitions(z, topic).each do |partition| + owner = partition_owner(z, group, partition) - topics = run_offset(cmd_offset).group_by { |h| h[:topic] } + leader = leader_broker(z, topic, partition) + consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset) + logsize = consumer.next_offset - critical 'Could not found topics/partitions' if topics.empty? + offset = consumer_offset(z, group, topic, partition) + + lag = logsize - offset + + consumers[topic][:partition] = partition + consumers[topic][:logsize] = logsize + consumers[topic][:offset] = offset + consumers[topic][:lag] = lag + consumers[topic][:owner] = owner + end + end [:offset, :logsize, :lag].each do |field| - topics.map do |k, v| + consumers.map do |k, v| critical "Topic #{k} has partitions with #{field} < 0" unless v.select { |w| w[field].to_i < 0 }.empty? end end - topics.map do |k, v| + consumers.map do |k, v| critical "Topic #{k} has partitions with no owner" unless v.select { |w| w[:owner] == 'none' }.empty? end @@ -165,7 +157,7 @@ def run end when :under if min_lag < threshold.to_i - msg = "Topics `#{min_topics}` for the group `#{config[:group]}` lag: #{min_lag} (<= #{threshold})" + msg = "Topics `#{min_topics}` for the group `#{config[:group]}` lag: #{min_lag} (<= #{threshold})" send severity, msg end end diff --git a/sensu-plugins-kafka.gemspec b/sensu-plugins-kafka.gemspec index 79efba5..9e49103 100644 --- a/sensu-plugins-kafka.gemspec +++ b/sensu-plugins-kafka.gemspec @@ -34,6 +34,8 @@ Gem::Specification.new do |s| s.test_files = s.files.grep(%r{^(test|spec|features)/}) s.version = SensuPluginsKafka::Version::VER_STRING + s.add_runtime_dependency 'celluloid-io', '~> 0.17.3 ' + s.add_runtime_dependency 'poseidon', '0.0.5' s.add_runtime_dependency 'sensu-plugin', '~> 1.3' s.add_runtime_dependency 'zookeeper', '~> 1.4.11'