-
Notifications
You must be signed in to change notification settings - Fork 69
Fix: make sure plugin can be stop-ed #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
203b4fb
Test: fix "integration" specs (using Redis)
kares 5adad05
CI: enable testing with real Redis server
kares 2676c6c
CI: override compose for Redis port mapping
kares 571f2c2
CI: fix redis service
kares d5ba181
CI: Traviiis!
kares 523c5b7
CI: on focal let's try redis
kares df0aa25
CI: here we go again install-ing manually
kares 496ee95
CI: and of course rebind to 127.0.0.1
kares dd8eb2d
CI: default bind should also use docker0
kares 1981ca3
CI: let's bind 0.0.0.0 as a last resort
kares d1488de
CI: finally access the host network
kares 97c759c
Test: skip subscription types (not using timeout)
kares 1ce6a57
Fix: avoid uninterruptible loops (with subscribtions)
kares f096488
Refactor: simplify method code
kares fa9f89d
Test: the timeout value is irrelevant here
kares bd20251
Test: ('timeout') behavior with channels
kares 613e9ca
Refactor: dry-out + improve error handling
kares 364fff8
Chore: bump + changelog
kares 8dac1cf
Refactor: safer plugin.stop (called from another thread)
kares b19e1d9
Fix: handle LS::ShutdownSignal
kares File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
version: '3' | ||
|
||
services: | ||
logstash: | ||
network_mode: host |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#!/bin/bash | ||
# This is intended to be run inside the docker container as the command of the docker-compose. | ||
|
||
env | ||
|
||
set -ex | ||
|
||
jruby -rbundler/setup -S rspec -fd | ||
|
||
jruby -rbundler/setup -S rspec -fd --tag redis |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,13 @@ | ||
import: | ||
- logstash-plugins/.ci:travis/travis.yml@1.x | ||
- logstash-plugins/.ci:travis/travis.yml@1.x | ||
|
||
addons: | ||
apt: | ||
sources: | ||
- sourceline: 'ppa:chris-lea/redis-server' | ||
packages: | ||
- redis-server | ||
|
||
before_install: | ||
- sudo service redis-server stop | ||
- sudo service redis-server start --bind 0.0.0.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,28 +107,26 @@ def is_list_type? | |
|
||
# private | ||
def redis_params | ||
params = { | ||
:timeout => @timeout, | ||
:db => @db, | ||
:password => @password.nil? ? nil : @password.value, | ||
:ssl => @ssl | ||
} | ||
|
||
if @path.nil? | ||
connectionParams = { | ||
:host => @host, | ||
:port => @port | ||
} | ||
params[:host] = @host | ||
params[:port] = @port | ||
else | ||
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") | ||
connectionParams = { | ||
:path => @path | ||
} | ||
params[:path] = @path | ||
end | ||
|
||
baseParams = { | ||
:timeout => @timeout, | ||
:db => @db, | ||
:password => @password.nil? ? nil : @password.value, | ||
:ssl => @ssl | ||
} | ||
|
||
return connectionParams.merge(baseParams) | ||
params | ||
end | ||
|
||
TIMEOUT = 5 # Redis only supports Integer values | ||
|
||
def new_redis_instance | ||
::Redis.new(redis_params) | ||
end | ||
|
@@ -174,9 +172,12 @@ def queue_event(msg, output_queue, channel=nil) | |
|
||
# private | ||
def list_stop | ||
return if @redis.nil? || !@redis.connected? | ||
redis = @redis # might change during method invocation | ||
return if redis.nil? || !redis.connected? | ||
|
||
@redis.quit rescue nil | ||
redis.quit rescue nil | ||
# check if input retried while executing | ||
list_stop unless redis.equal? @redis | ||
@redis = nil | ||
end | ||
|
||
|
@@ -186,15 +187,8 @@ def list_runner(output_queue) | |
begin | ||
@redis ||= connect | ||
@list_method.call(@redis, output_queue) | ||
rescue ::Redis::BaseError => e | ||
info = { message: e.message, exception: e.class } | ||
info[:backtrace] = e.backtrace if @logger.debug? | ||
@logger.warn("Redis connection problem", info) | ||
# Reset the redis variable to trigger reconnect | ||
@redis = nil | ||
# this sleep does not need to be stoppable as its | ||
# in a while !stop? loop | ||
sleep 1 | ||
rescue => e | ||
retry if handle_error(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: I would expect a method called |
||
end | ||
end | ||
end | ||
|
@@ -238,7 +232,7 @@ def list_batch_listener(redis, output_queue) | |
end | ||
|
||
def list_single_listener(redis, output_queue) | ||
item = redis.blpop(@key, 0, :timeout => 1) | ||
item = redis.blpop(@key, 0, :timeout => TIMEOUT) | ||
return unless item # from timeout or other conditions | ||
|
||
# blpop returns the 'key' read from as well as the item result | ||
|
@@ -248,18 +242,20 @@ def list_single_listener(redis, output_queue) | |
|
||
# private | ||
def subscribe_stop | ||
return if @redis.nil? || !@redis.connected? | ||
# if its a SubscribedClient then: | ||
# it does not have a disconnect method (yet) | ||
if @redis.subscribed? | ||
redis = @redis # might change during method invocation | ||
return if redis.nil? || !redis.connected? | ||
|
||
if redis.subscribed? | ||
if @data_type == 'pattern_channel' | ||
@redis.punsubscribe | ||
redis.punsubscribe | ||
else | ||
@redis.unsubscribe | ||
redis.unsubscribe | ||
end | ||
else | ||
@redis.disconnect! | ||
redis.disconnect! | ||
end | ||
# check if input retried while executing | ||
subscribe_stop unless redis.equal? @redis | ||
@redis = nil | ||
end | ||
|
||
|
@@ -268,62 +264,79 @@ def redis_runner | |
begin | ||
@redis ||= connect | ||
yield | ||
rescue ::Redis::BaseError => e | ||
@logger.warn("Redis connection problem", :exception => e) | ||
# Reset the redis variable to trigger reconnect | ||
@redis = nil | ||
Stud.stoppable_sleep(1) { stop? } | ||
retry if !stop? | ||
rescue => e | ||
retry if handle_error(e) | ||
end | ||
end | ||
|
||
def handle_error(e) | ||
info = { message: e.message, exception: e.class } | ||
info[:backtrace] = e.backtrace if @logger.debug? | ||
|
||
case e | ||
when ::Redis::TimeoutError | ||
# expected for channels in case no data is available | ||
@logger.debug("Redis timeout, retrying", info) | ||
when ::Redis::BaseConnectionError, ::Redis::ProtocolError | ||
@logger.warn("Redis connection error", info) | ||
when ::Redis::BaseError | ||
@logger.error("Redis error", info) | ||
when ::LogStash::ShutdownSignal | ||
@logger.debug("Received shutdown signal") | ||
return false # stop retry-ing | ||
else | ||
info[:backtrace] ||= e.backtrace | ||
@logger.error("Unexpected error", info) | ||
end | ||
|
||
# Reset the redis variable to trigger reconnect | ||
@redis = nil | ||
|
||
Stud.stoppable_sleep(1) { stop? } | ||
!stop? # return true unless stop? | ||
end | ||
|
||
# private | ||
def channel_runner(output_queue) | ||
redis_runner do | ||
channel_listener(output_queue) | ||
end | ||
redis_runner { channel_listener(output_queue) } | ||
end | ||
|
||
# private | ||
def channel_listener(output_queue) | ||
@redis.subscribe(@key) do |on| | ||
@redis.subscribe_with_timeout(TIMEOUT, @key) do |on| | ||
on.subscribe do |channel, count| | ||
@logger.info("Subscribed", :channel => channel, :count => count) | ||
@logger.debug("Subscribed", :channel => channel, :count => count) | ||
end | ||
|
||
on.message do |channel, message| | ||
queue_event(message, output_queue, channel) | ||
end | ||
|
||
on.unsubscribe do |channel, count| | ||
@logger.info("Unsubscribed", :channel => channel, :count => count) | ||
@logger.debug("Unsubscribed", :channel => channel, :count => count) | ||
end | ||
end | ||
end | ||
|
||
def pattern_channel_runner(output_queue) | ||
redis_runner do | ||
pattern_channel_listener(output_queue) | ||
end | ||
redis_runner { pattern_channel_listener(output_queue) } | ||
end | ||
|
||
# private | ||
def pattern_channel_listener(output_queue) | ||
@redis.psubscribe @key do |on| | ||
@redis.psubscribe_with_timeout(TIMEOUT, @key) do |on| | ||
on.psubscribe do |channel, count| | ||
@logger.info("Subscribed", :channel => channel, :count => count) | ||
@logger.debug("Subscribed", :channel => channel, :count => count) | ||
end | ||
|
||
on.pmessage do |pattern, channel, message| | ||
queue_event(message, output_queue, channel) | ||
end | ||
|
||
on.punsubscribe do |channel, count| | ||
@logger.info("Unsubscribed", :channel => channel, :count => count) | ||
@logger.debug("Unsubscribed", :channel => channel, :count => count) | ||
end | ||
end | ||
end | ||
|
||
# end | ||
|
||
end end end # Redis Inputs LogStash |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this timeout is internal for the redis (read/write) operation, we catch
TimeoutError
and retry in a loop,provides a way for interrupting the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have multiple timeouts here in layers, it would be helpful to name this something distinct.