Skip to content

Fix: better (Redis) exception handling #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
203b4fb
Test: fix "integration" specs (using Redis)
kares Apr 7, 2021
5adad05
CI: enable testing with real Redis server
kares Apr 7, 2021
2676c6c
CI: override compose for Redis port mapping
kares Apr 8, 2021
571f2c2
CI: fix redis service
kares Apr 8, 2021
d5ba181
CI: Traviiis!
kares Apr 8, 2021
523c5b7
CI: on focal let's try redis
kares Apr 8, 2021
df0aa25
CI: here we go again install-ing manually
kares Apr 8, 2021
496ee95
CI: and of course rebind to 127.0.0.1
kares Apr 8, 2021
dd8eb2d
CI: default bind should also use docker0
kares Apr 8, 2021
1981ca3
CI: let's bind 0.0.0.0 as a last resort
kares Apr 8, 2021
d1488de
CI: finally access the host network
kares Apr 8, 2021
97c759c
Test: skip subscription types (not using timeout)
kares Apr 8, 2021
1ce6a57
Fix: avoid uninterruptible loops (with subscribtions)
kares Apr 12, 2021
f096488
Refactor: simplify method code
kares Apr 12, 2021
fa9f89d
Test: the timeout value is irrelevant here
kares Apr 12, 2021
bd20251
Test: ('timeout') behavior with channels
kares Apr 12, 2021
613e9ca
Refactor: dry-out + improve error handling
kares Apr 13, 2021
364fff8
Chore: bump + changelog
kares Apr 13, 2021
8dac1cf
Refactor: safer plugin.stop (called from another thread)
kares Apr 13, 2021
b19e1d9
Fix: handle LS::ShutdownSignal
kares Apr 13, 2021
30498fd
Revert "Test: ('timeout') behavior with channels"
kares Apr 15, 2021
4b9eeea
Revert "Fix: avoid uninterruptible loops (with subscribtions)"
kares Apr 15, 2021
a1a6a43
Refactor: handle_error return based on review
kares Apr 15, 2021
a7a6a7f
Test: Redis behavior raising an IOError
kares Apr 15, 2021
0184b03
changelog
kares Apr 15, 2021
9f503e1
Test: try harder to meet expectation
kares Apr 19, 2021
200e9bf
Refactor: split handle_error into two
kares May 3, 2021
1f429ac
Review: always close (disconnect) redis client
kares May 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .ci/docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: '3'

services:
logstash:
network_mode: host
10 changes: 10 additions & 0 deletions .ci/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
# This is intended to be run inside the docker container as the command of the docker-compose.

env

set -ex

jruby -rbundler/setup -S rspec -fd

jruby -rbundler/setup -S rspec -fd --tag redis
13 changes: 12 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
import:
- logstash-plugins/.ci:travis/travis.yml@1.x
- logstash-plugins/.ci:travis/travis.yml@1.x

addons:
apt:
sources:
- sourceline: 'ppa:chris-lea/redis-server'
packages:
- redis-server

before_install:
- sudo service redis-server stop
- sudo service redis-server start --bind 0.0.0.0
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.7.0
- Fix: better (Redis) exception handling [#89](https://github.com/logstash-plugins/logstash-input-redis/pull/89)
- Test: start running integration specs on CI

## 3.6.1
- Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86)

Expand Down
104 changes: 62 additions & 42 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,22 @@ def is_list_type?

# private
def redis_params
params = {
:timeout => @timeout,
:db => @db,
:password => @password.nil? ? nil : @password.value,
:ssl => @ssl
}

if @path.nil?
connectionParams = {
:host => @host,
:port => @port
}
params[:host] = @host
params[:port] = @port
else
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
connectionParams = {
:path => @path
}
params[:path] = @path
end

baseParams = {
:timeout => @timeout,
:db => @db,
:password => @password.nil? ? nil : @password.value,
:ssl => @ssl
}

return connectionParams.merge(baseParams)
params
end

def new_redis_instance
Expand Down Expand Up @@ -174,9 +170,12 @@ def queue_event(msg, output_queue, channel=nil)

# private
def list_stop
return if @redis.nil? || !@redis.connected?
redis = @redis # might change during method invocation
return if redis.nil? || !redis.connected?

@redis.quit rescue nil
redis.quit rescue nil # does client.disconnect internally
# check if input retried while executing
list_stop unless redis.equal? @redis
@redis = nil
end

Expand All @@ -186,15 +185,9 @@ def list_runner(output_queue)
begin
@redis ||= connect
@list_method.call(@redis, output_queue)
rescue ::Redis::BaseError => e
info = { message: e.message, exception: e.class }
info[:backtrace] = e.backtrace if @logger.debug?
@logger.warn("Redis connection problem", info)
# Reset the redis variable to trigger reconnect
@redis = nil
# this sleep does not need to be stoppable as its
# in a while !stop? loop
sleep 1
rescue => e
log_error(e)
retry if reset_for_error_retry(e)
end
end
end
Expand Down Expand Up @@ -248,18 +241,19 @@ def list_single_listener(redis, output_queue)

# private
def subscribe_stop
return if @redis.nil? || !@redis.connected?
# if its a SubscribedClient then:
# it does not have a disconnect method (yet)
if @redis.subscribed?
redis = @redis # might change during method invocation
return if redis.nil? || !redis.connected?

if redis.subscribed?
if @data_type == 'pattern_channel'
@redis.punsubscribe
redis.punsubscribe
else
@redis.unsubscribe
redis.unsubscribe
end
else
@redis.disconnect!
end
redis.close rescue nil # does client.disconnect
# check if input retried while executing
subscribe_stop unless redis.equal? @redis
@redis = nil
end

Expand All @@ -268,15 +262,43 @@ def redis_runner
begin
@redis ||= connect
yield
rescue ::Redis::BaseError => e
@logger.warn("Redis connection problem", :exception => e)
# Reset the redis variable to trigger reconnect
@redis = nil
Stud.stoppable_sleep(1) { stop? }
retry if !stop?
rescue => e
log_error(e)
retry if reset_for_error_retry(e)
end
end

def log_error(e)
info = { message: e.message, exception: e.class }
info[:backtrace] = e.backtrace if @logger.debug?

case e
when ::Redis::TimeoutError
# expected for channels in case no data is available
@logger.debug("Redis timeout, retrying", info)
when ::Redis::BaseConnectionError, ::Redis::ProtocolError
@logger.warn("Redis connection error", info)
when ::Redis::BaseError
@logger.error("Redis error", info)
when ::LogStash::ShutdownSignal
@logger.debug("Received shutdown signal")
else
info[:backtrace] ||= e.backtrace
@logger.error("Unexpected error", info)
end
end

# @return [true] if operation is fine to retry
def reset_for_error_retry(e)
return if e.is_a?(::LogStash::ShutdownSignal)

# Reset the redis variable to trigger reconnect
@redis = nil

Stud.stoppable_sleep(1) { stop? }
!stop? # retry if not stop-ing
end

# private
def channel_runner(output_queue)
redis_runner do
Expand Down Expand Up @@ -324,6 +346,4 @@ def pattern_channel_listener(output_queue)
end
end

# end

end end end # Redis Inputs LogStash
2 changes: 1 addition & 1 deletion logstash-input-redis.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-redis'
s.version = '3.6.1'
s.version = '3.7.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads events from a Redis instance"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
89 changes: 80 additions & 9 deletions spec/inputs/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ def populate(key, event_count)
end

def process(conf, event_count)
events = input(conf) do |pipeline, queue|
event_count.times.map{queue.pop}
events = input(conf) do |_, queue|
sleep 0.1 until queue.size >= event_count
queue.size.times.map { queue.pop }
end

expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a)
# due multiple workers we get events out-of-order in the output
events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') }
expect(events[0].get('sequence')).to eq(0)
expect(events[100].get('sequence')).to eq(100)
expect(events[1000].get('sequence')).to eq(1000)
end

# integration tests ---------------------
Expand All @@ -31,7 +35,6 @@ def process(conf, event_count)
it "should read events from a list" do
key = SecureRandom.hex
event_count = 1000 + rand(50)
# event_count = 100
conf = <<-CONFIG
input {
redis {
Expand Down Expand Up @@ -163,7 +166,6 @@ def process(conf, event_count)
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
expect(command[0]).to eql :blpop
expect(command[1]).to eql ['foo', 0]
expect(command[2]).to eql 1
end.and_return ['foo', "{\"foo1\":\"bar\""], nil

tt = Thread.new do
Expand All @@ -178,6 +180,69 @@ def process(conf, event_count)
expect( queue.size ).to be > 0
end

it 'keeps running when a connection error occurs' do
raised = false
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
expect(command[0]).to eql :blpop
unless raised
raised = true
raise Redis::CannotConnectError.new('test')
end
['foo', "{\"after\":\"raise\"}"]
end

expect(subject.logger).to receive(:warn).with('Redis connection error',
hash_including(:message=>"test", :exception=>Redis::CannotConnectError)
).and_call_original

tt = Thread.new do
sleep 2.0 # allow for retry (sleep) after handle_error
subject.do_stop
end

subject.run(queue)

tt.join

try(3) { expect( queue.size ).to be > 0 }
end

context 'error handling' do

let(:config) do
super().merge 'batch_count' => 2
end

it 'keeps running when a (non-Redis) io error occurs' do
raised = false
allow(subject).to receive(:connect).and_return redis = double('redis')
allow(redis).to receive(:blpop).and_return nil
expect(redis).to receive(:evalsha) do
unless raised
raised = true
raise IOError.new('closed stream')
end
[]
end.at_least(1)
redis
allow(subject).to receive(:stop)

expect(subject.logger).to receive(:error).with('Unexpected error',
hash_including(:message=>'closed stream', :exception=>IOError)
).and_call_original

tt = Thread.new do
sleep 2.0 # allow for retry (sleep) after handle_error
subject.do_stop
end

subject.run(queue)

tt.join
end

end

context "when the batch size is greater than 1" do
let(:batch_count) { 10 }

Expand Down Expand Up @@ -233,9 +298,6 @@ def process(conf, event_count)
end

it 'multiple close calls, calls to redis once' do
# subject.use_redis(redis)
# allow(redis).to receive(:blpop).and_return(['foo', 'l1'])
# expect(redis).to receive(:connected?).and_return(connected.last)
allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
# allow_any_instance_of( Redis::Client ).to receive(:disconnect)
quit_calls.each do |call|
Expand All @@ -249,6 +311,9 @@ def process(conf, event_count)
end

context 'for the subscribe data_types' do

before { subject.register }

def run_it_thread(inst)
Thread.new(inst) do |subj|
subj.run(queue)
Expand Down Expand Up @@ -289,6 +354,8 @@ def close_thread(inst, rt)
let(:data_type) { 'channel' }
let(:quit_calls) { [:unsubscribe, :connection] }

before { subject.register }

context 'mocked redis' do
it 'multiple stop calls, calls to redis once', type: :mocked do
subject.do_stop
Expand Down Expand Up @@ -367,6 +434,10 @@ def close_thread(inst, rt)

["list", "channel", "pattern_channel"].each do |data_type|
context data_type do
# TODO pending
# redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout
next unless data_type == 'list'

it_behaves_like "an interruptible input plugin", :redis => true do
let(:config) { { 'key' => 'foo', 'data_type' => data_type } }
end
Expand Down