From 210d28bd4c6b746de5e8d9baf5844d1cf7478e6e Mon Sep 17 00:00:00 2001 From: Olivier Bazoud Date: Thu, 16 Feb 2017 11:49:37 +0100 Subject: [PATCH] Remove celluloid, fix data structure, fix partition_owner function --- CHANGELOG.md | 5 +++++ bin/check-consumer-lag.rb | 34 +++++++++++----------------------- sensu-plugins-kafka.gemspec | 1 - 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e152a60..206c7cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang ## [Unreleased] +### Fixed +- check-consumer-lag: remove celluloid (temporary), fix data structure, fix partition_owner function + +## [0.8.0] + ### Breaking Changes - Refactoring check-consumer-lag to remove shell/jvm execution diff --git a/bin/check-consumer-lag.rb b/bin/check-consumer-lag.rb index e0518e3..adc70b5 100755 --- a/bin/check-consumer-lag.rb +++ b/bin/check-consumer-lag.rb @@ -27,17 +27,10 @@ require 'sensu-plugin/check/cli' -require 'celluloid-io' require 'json' require 'poseidon' require 'zookeeper' -module Poseidon - class Connection - TCPSocket = Celluloid::IO::TCPSocket - end -end - class ConsumerLagCheck < Sensu::Plugin::Check::CLI option :group, description: 'Consumer group', @@ -89,8 +82,8 @@ def consumer_offset(zk, group, topic, partition) zk.get(path: "/consumers/#{group}/offsets/#{topic}/#{partition}")[:data].to_i end - def partition_owner(zk, group, partition) - zk.get(path: "/consumers/#{group}/offsets/#{owners}/#{partition}")[:data] + def partition_owner(zk, group, topic, partition) + zk.get(path: "/consumers/#{group}/owners/#{topic}/#{partition}")[:data] end def run @@ -103,10 +96,10 @@ def run consumers = {} topics.each do |topic| - consumers[topic] = {} + consumers[topic] = [] topics_partitions(z, topic).each do |partition| - owner = partition_owner(z, group, partition) + owner = partition_owner(z, group, topic, partition) leader = leader_broker(z, topic, partition) consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset) @@ -115,27 +108,22 @@ def run offset = consumer_offset(z, group, topic, partition) lag = logsize - offset - - consumers[topic][:partition] = partition - consumers[topic][:logsize] = logsize - consumers[topic][:offset] = offset - consumers[topic][:lag] = lag - consumers[topic][:owner] = owner + consumers[topic].push(partition: partition, logsize: logsize, offset: offset, lag: lag, owner: owner) end end [:offset, :logsize, :lag].each do |field| - consumers.map do |k, v| - critical "Topic #{k} has partitions with #{field} < 0" unless v.select { |w| w[field].to_i < 0 }.empty? + consumers.each do |k, v| + critical "Topic #{k} has #{field} < 0 '#{v[field]}'" unless v.select { |w| w[field].to_i < 0 }.empty? end end - consumers.map do |k, v| - critical "Topic #{k} has partitions with no owner" unless v.select { |w| w[:owner] == 'none' }.empty? + consumers.each do |k, v| + critical "Topic #{k} has partitions with no owner '#{v[:owner]}'" unless v.select { |w| w[:owner] == 'none' }.empty? end - lags = topics.map do |k, v| - Hash[k, v.inject(0) { |a, e| a + e[:lag].to_i }] + lags = consumers.map do |k, v| + Hash[k, v.inject(0) { |a, e| a + e[:lag] }] end max_lag = lags.map(&:values).flatten.max diff --git a/sensu-plugins-kafka.gemspec b/sensu-plugins-kafka.gemspec index 9e49103..588900a 100644 --- a/sensu-plugins-kafka.gemspec +++ b/sensu-plugins-kafka.gemspec @@ -34,7 +34,6 @@ Gem::Specification.new do |s| s.test_files = s.files.grep(%r{^(test|spec|features)/}) s.version = SensuPluginsKafka::Version::VER_STRING - s.add_runtime_dependency 'celluloid-io', '~> 0.17.3 ' s.add_runtime_dependency 'poseidon', '0.0.5' s.add_runtime_dependency 'sensu-plugin', '~> 1.3' s.add_runtime_dependency 'zookeeper', '~> 1.4.11'