Skip to content

Fix: make sure plugin can be stop-ed #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .ci/docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: '3'

services:
logstash:
network_mode: host
10 changes: 10 additions & 0 deletions .ci/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
# This is intended to be run inside the docker container as the command of the docker-compose.

env

set -ex

jruby -rbundler/setup -S rspec -fd

jruby -rbundler/setup -S rspec -fd --tag redis
13 changes: 12 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
import:
- logstash-plugins/.ci:travis/travis.yml@1.x
- logstash-plugins/.ci:travis/travis.yml@1.x

addons:
apt:
sources:
- sourceline: 'ppa:chris-lea/redis-server'
packages:
- redis-server

before_install:
- sudo service redis-server stop
- sudo service redis-server start --bind 0.0.0.0
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.7.0
- Fix: make sure plugin can be stop-ed in case of a channel data_type [#87](https://github.com/logstash-plugins/logstash-input-redis/pull/87)
- Test: start running integration specs on CI

## 3.6.1
- Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86)

Expand Down
121 changes: 67 additions & 54 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,26 @@ def is_list_type?

# private
def redis_params
params = {
:timeout => @timeout,
:db => @db,
:password => @password.nil? ? nil : @password.value,
:ssl => @ssl
}

if @path.nil?
connectionParams = {
:host => @host,
:port => @port
}
params[:host] = @host
params[:port] = @port
else
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
connectionParams = {
:path => @path
}
params[:path] = @path
end

baseParams = {
:timeout => @timeout,
:db => @db,
:password => @password.nil? ? nil : @password.value,
:ssl => @ssl
}

return connectionParams.merge(baseParams)
params
end

TIMEOUT = 5 # Redis only supports Integer values
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this timeout is internal for the redis (read/write) operation, we catch TimeoutError and retry in a loop,
provides a way for interrupting the input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have multiple timeouts here in layers, it would be helpful to name this something distinct.

Suggested change
TIMEOUT = 5 # Redis only supports Integer values
BLOCKING_CALL_TIMEOUT = 5 # Redis only supports Integer values


def new_redis_instance
::Redis.new(redis_params)
end
Expand Down Expand Up @@ -174,9 +172,12 @@ def queue_event(msg, output_queue, channel=nil)

# private
def list_stop
return if @redis.nil? || !@redis.connected?
redis = @redis # might change during method invocation
return if redis.nil? || !redis.connected?

@redis.quit rescue nil
redis.quit rescue nil
# check if input retried while executing
list_stop unless redis.equal? @redis
@redis = nil
end

Expand All @@ -186,15 +187,8 @@ def list_runner(output_queue)
begin
@redis ||= connect
@list_method.call(@redis, output_queue)
rescue ::Redis::BaseError => e
info = { message: e.message, exception: e.class }
info[:backtrace] = e.backtrace if @logger.debug?
@logger.warn("Redis connection problem", info)
# Reset the redis variable to trigger reconnect
@redis = nil
# this sleep does not need to be stoppable as its
# in a while !stop? loop
sleep 1
rescue => e
retry if handle_error(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I would expect a method called handle_error to return true if the error had been handled, but the way that we retry here indicates the opposite.

end
end
end
Expand Down Expand Up @@ -238,7 +232,7 @@ def list_batch_listener(redis, output_queue)
end

def list_single_listener(redis, output_queue)
item = redis.blpop(@key, 0, :timeout => 1)
item = redis.blpop(@key, 0, :timeout => TIMEOUT)
return unless item # from timeout or other conditions

# blpop returns the 'key' read from as well as the item result
Expand All @@ -248,18 +242,20 @@ def list_single_listener(redis, output_queue)

# private
def subscribe_stop
return if @redis.nil? || !@redis.connected?
# if its a SubscribedClient then:
# it does not have a disconnect method (yet)
if @redis.subscribed?
redis = @redis # might change during method invocation
return if redis.nil? || !redis.connected?

if redis.subscribed?
if @data_type == 'pattern_channel'
@redis.punsubscribe
redis.punsubscribe
else
@redis.unsubscribe
redis.unsubscribe
end
else
@redis.disconnect!
redis.disconnect!
end
# check if input retried while executing
subscribe_stop unless redis.equal? @redis
@redis = nil
end

Expand All @@ -268,62 +264,79 @@ def redis_runner
begin
@redis ||= connect
yield
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
Stud.stoppable_sleep(1) { stop? }
retry if !stop?
rescue => e
retry if handle_error(e)
end
end

def handle_error(e)
info = { message: e.message, exception: e.class }
info[:backtrace] = e.backtrace if @logger.debug?

case e
when ::Redis::TimeoutError
# expected for channels in case no data is available
@logger.debug("Redis timeout, retrying", info)
when ::Redis::BaseConnectionError, ::Redis::ProtocolError
@logger.warn("Redis connection error", info)
when ::Redis::BaseError
@logger.error("Redis error", info)
when ::LogStash::ShutdownSignal
@logger.debug("Received shutdown signal")
return false # stop retry-ing
else
info[:backtrace] ||= e.backtrace
@logger.error("Unexpected error", info)
end

# Reset the redis variable to trigger reconnect
@redis = nil

Stud.stoppable_sleep(1) { stop? }
!stop? # return true unless stop?
end

# private
def channel_runner(output_queue)
redis_runner do
channel_listener(output_queue)
end
redis_runner { channel_listener(output_queue) }
end

# private
def channel_listener(output_queue)
@redis.subscribe(@key) do |on|
@redis.subscribe_with_timeout(TIMEOUT, @key) do |on|
on.subscribe do |channel, count|
@logger.info("Subscribed", :channel => channel, :count => count)
@logger.debug("Subscribed", :channel => channel, :count => count)
end

on.message do |channel, message|
queue_event(message, output_queue, channel)
end

on.unsubscribe do |channel, count|
@logger.info("Unsubscribed", :channel => channel, :count => count)
@logger.debug("Unsubscribed", :channel => channel, :count => count)
end
end
end

def pattern_channel_runner(output_queue)
redis_runner do
pattern_channel_listener(output_queue)
end
redis_runner { pattern_channel_listener(output_queue) }
end

# private
def pattern_channel_listener(output_queue)
@redis.psubscribe @key do |on|
@redis.psubscribe_with_timeout(TIMEOUT, @key) do |on|
on.psubscribe do |channel, count|
@logger.info("Subscribed", :channel => channel, :count => count)
@logger.debug("Subscribed", :channel => channel, :count => count)
end

on.pmessage do |pattern, channel, message|
queue_event(message, output_queue, channel)
end

on.punsubscribe do |channel, count|
@logger.info("Unsubscribed", :channel => channel, :count => count)
@logger.debug("Unsubscribed", :channel => channel, :count => count)
end
end
end

# end

end end end # Redis Inputs LogStash
2 changes: 1 addition & 1 deletion logstash-input-redis.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-redis'
s.version = '3.6.1'
s.version = '3.7.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads events from a Redis instance"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading