Skip to content

Commit

Permalink
Remove celluloid, fix data structure, fix partition_owner function
Browse files Browse the repository at this point in the history
  • Loading branch information
obazoud committed Feb 16, 2017
1 parent e21fc14 commit 210d28b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 24 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ This CHANGELOG follows the format listed at [Keep A Changelog](http://keepachang

## [Unreleased]

### Fixed
- check-consumer-lag: remove celluloid (temporary), fix data structure, fix partition_owner function

## [0.8.0]

### Breaking Changes
- Refactoring check-consumer-lag to remove shell/jvm execution

Expand Down
34 changes: 11 additions & 23 deletions bin/check-consumer-lag.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,10 @@

require 'sensu-plugin/check/cli'

require 'celluloid-io'
require 'json'
require 'poseidon'
require 'zookeeper'

module Poseidon
class Connection
TCPSocket = Celluloid::IO::TCPSocket
end
end

class ConsumerLagCheck < Sensu::Plugin::Check::CLI
option :group,
description: 'Consumer group',
Expand Down Expand Up @@ -89,8 +82,8 @@ def consumer_offset(zk, group, topic, partition)
zk.get(path: "/consumers/#{group}/offsets/#{topic}/#{partition}")[:data].to_i
end

def partition_owner(zk, group, partition)
zk.get(path: "/consumers/#{group}/offsets/#{owners}/#{partition}")[:data]
def partition_owner(zk, group, topic, partition)
zk.get(path: "/consumers/#{group}/owners/#{topic}/#{partition}")[:data]
end

def run
Expand All @@ -103,10 +96,10 @@ def run

consumers = {}
topics.each do |topic|
consumers[topic] = {}
consumers[topic] = []

topics_partitions(z, topic).each do |partition|
owner = partition_owner(z, group, partition)
owner = partition_owner(z, group, topic, partition)

leader = leader_broker(z, topic, partition)
consumer = Poseidon::PartitionConsumer.new('CheckConsumerLag', leader['host'], leader['port'], topic, partition, :latest_offset)
Expand All @@ -115,27 +108,22 @@ def run
offset = consumer_offset(z, group, topic, partition)

lag = logsize - offset

consumers[topic][:partition] = partition
consumers[topic][:logsize] = logsize
consumers[topic][:offset] = offset
consumers[topic][:lag] = lag
consumers[topic][:owner] = owner
consumers[topic].push(partition: partition, logsize: logsize, offset: offset, lag: lag, owner: owner)
end
end

[:offset, :logsize, :lag].each do |field|
consumers.map do |k, v|
critical "Topic #{k} has partitions with #{field} < 0" unless v.select { |w| w[field].to_i < 0 }.empty?
consumers.each do |k, v|
critical "Topic #{k} has #{field} < 0 '#{v[field]}'" unless v.select { |w| w[field].to_i < 0 }.empty?
end
end

consumers.map do |k, v|
critical "Topic #{k} has partitions with no owner" unless v.select { |w| w[:owner] == 'none' }.empty?
consumers.each do |k, v|
critical "Topic #{k} has partitions with no owner '#{v[:owner]}'" unless v.select { |w| w[:owner] == 'none' }.empty?
end

lags = topics.map do |k, v|
Hash[k, v.inject(0) { |a, e| a + e[:lag].to_i }]
lags = consumers.map do |k, v|
Hash[k, v.inject(0) { |a, e| a + e[:lag] }]
end

max_lag = lags.map(&:values).flatten.max
Expand Down
1 change: 0 additions & 1 deletion sensu-plugins-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ Gem::Specification.new do |s|
s.test_files = s.files.grep(%r{^(test|spec|features)/})
s.version = SensuPluginsKafka::Version::VER_STRING

s.add_runtime_dependency 'celluloid-io', '~> 0.17.3 '
s.add_runtime_dependency 'poseidon', '0.0.5'
s.add_runtime_dependency 'sensu-plugin', '~> 1.3'
s.add_runtime_dependency 'zookeeper', '~> 1.4.11'
Expand Down

0 comments on commit 210d28b

Please sign in to comment.