Skip to content

Commit

Permalink
Merge branch 'inactivity_checks'
Browse files Browse the repository at this point in the history
Conflicts:
	README.md
  • Loading branch information
mloughran committed Jul 15, 2014
2 parents 2b4d6b7 + 210c9d2 commit 283f664
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 1 deletion.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ As a final convenience, it is possible to load all lua scripts from a directory

For examples see `examples/lua.rb` or `lib/em-hiredis/lock_lua`.

## Inactivity checks

Sometimes a network connection may hang in ways which are difficult to detect or involve very long timeouts before they can be detected from the application layer. This is especially true of Redis Pubsub connections, as they are not request-response driven. It is very difficult for a listening client to descern between a hung connection and a server with nothing to say.

To start an application layer ping-pong mechanism for testing connection liveness, call the following at any time on a client:

redis.configure_inactivity_timeout(5, 3)

This configures a `PING` command to be sent if 5 seconds elapse without receiving any data from the server, and a reconnection to be triggered if a futher 3 seconds elapse after the `PING` is submitted.

This configuration is per client, you may choose different value for clients with different expected traffic patterns, or activate it on some and not at all on others.

### PING and Pubsub

Because the Redis Pubsub protocol limits the set of valid commands on a connection once it is in "Pubsub" mode, `PING` is not supported in this case (though it may be in future, see https://github.com/antirez/redis/issues/420). In order to create some valid request-response traffic on the connection, a Pubsub connection will issue `SUBSCRIBE "__em-hiredis-ping"`, followed by a corresponding `UNSUBSCRIBE` immediately on success of the subscribe.
While less than ideal, this is the case where an application layer inactivity check is most valuable, and so the trade off is reasonable until `PING` is supported correctly on Pubsub connections.

## Developing

You need bundler and a local redis server running on port 6379 to run the test suite.
Expand Down
42 changes: 42 additions & 0 deletions lib/em-hiredis/base_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def initialize(host = 'localhost', port = 6379, password = nil, db = nil)
@reconnect_timer = nil
@failed = false

@inactive_seconds = 0

self.on(:failed) {
@failed = true
@command_queue.each do |df, _, _|
Expand Down Expand Up @@ -72,6 +74,7 @@ def connect
@connection = EM.connect(@host, @port, Connection, @host, @port)

@connection.on(:closed) do
cancel_inactivity_checks
if @connected
@defs.each { |d| d.fail(Error.new("Redis disconnected")) }
@defs = []
Expand Down Expand Up @@ -115,6 +118,8 @@ def connect
end
@command_queue = []

schedule_inactivity_checks

emit(:connected)
EM::Hiredis.logger.info("#{@connection} Connected")
succeed
Expand All @@ -133,6 +138,7 @@ def connect
error.redis_error = reply
deferred.fail(error) if deferred
else
@inactive_seconds = 0
handle_reply(reply)
end
end
Expand Down Expand Up @@ -181,6 +187,21 @@ def reconnect_connection
reconnect
end

# Starts an inactivity checker which will ping redis if nothing has been
# heard on the connection for `trigger_secs` seconds and forces a reconnect
# after a further `response_timeout` seconds if we still don't hear anything.
def configure_inactivity_check(trigger_secs, response_timeout)
raise ArgumentError('trigger_secs must be > 0') unless trigger_secs.to_i > 0
raise ArgumentError('response_timeout must be > 0') unless response_timeout.to_i > 0

@inactivity_trigger_secs = trigger_secs.to_i
@inactivity_response_timeout = response_timeout.to_i

# Start the inactivity check now only if we're already conected, otherwise
# the connected event will schedule it.
schedule_inactivity_checks if @connected
end

private

def method_missing(sym, *args)
Expand All @@ -206,6 +227,27 @@ def reconnect
EM::Hiredis.logger.info("#{@connection} Reconnecting")
end

def cancel_inactivity_checks
EM.cancel_timer(@inactivity_timer) if @inactivity_timer
@inactivity_timer = nil
end

def schedule_inactivity_checks
if @inactivity_trigger_secs
@inactive_seconds = 0
@inactivity_timer = EM.add_periodic_timer(1) {
@inactive_seconds += 1
if @inactive_seconds > @inactivity_trigger_secs + @inactivity_response_timeout
EM::Hiredis.logger.error "#{@connection} No response to ping, triggering reconnect"
reconnect!
elsif @inactive_seconds > @inactivity_trigger_secs
EM::Hiredis.logger.debug "#{@connection} Connection inactive, triggering ping"
ping
end
}
end
end

def handle_reply(reply)
if @defs.empty?
if @monitoring
Expand Down
17 changes: 16 additions & 1 deletion lib/em-hiredis/pubsub_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module EventMachine::Hiredis
class PubsubClient < BaseClient
PUBSUB_MESSAGES = %w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze

PING_CHANNEL = '__em-hiredis-ping'

def initialize(host='localhost', port='6379', password=nil, db=nil)
@subs, @psubs = [], []
@pubsub_defs = Hash.new { |h,k| h[k] = [] }
Expand Down Expand Up @@ -125,7 +127,20 @@ def punsubscribe_proc(pattern, proc)
end
return df
end


# Pubsub connections to not support even the PING command, but it is useful,
# especially with read-only connections like pubsub, to be able to check that
# the TCP connection is still usefully alive.
#
# This is not particularly elegant, but it's probably the best we can do
# for now. Ping support for pubsub connections is being considerred:
# https://github.com/antirez/redis/issues/420
def ping
subscribe(PING_CHANNEL).callback {
unsubscribe(PING_CHANNEL)
}
end

private

# Send a command to redis without adding a deferrable for it. This is
Expand Down
66 changes: 66 additions & 0 deletions spec/inactivity_check_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
require 'spec_helper'
require 'support/inprocess_redis_mock'

def connect_mock(timeout = 10, url = "redis://localhost:6381", server = nil, &blk)
em(timeout) do
IRedisMock.start
redis = EventMachine::Hiredis.connect(url)
blk.call(redis)
IRedisMock.stop
end
end

describe EM::Hiredis::BaseClient do
it "should ping after activity timeout reached" do
connect_mock do |redis|
redis.configure_inactivity_check(2, 1)
EM.add_timer(4) {
IRedisMock.received.should include("ping")
done
}
end
end

it "should not ping before activity timeout reached" do
connect_mock do |redis|
redis.configure_inactivity_check(3, 1)
EM.add_timer(2) {
IRedisMock.received.should_not include("ping")
done
}
end
end

it "should ping after timeout reached even though command has been sent (no response)" do
connect_mock do |redis|
redis.configure_inactivity_check(2, 1)
IRedisMock.pause # no responses from now on

EM.add_timer(1.5) {
redis.get "test"
}

EM.add_timer(4) {
IRedisMock.received.should include("ping")
done
}
end
end

it "should trigger a reconnect when there's no response to ping" do
connect_mock do |redis|
redis.configure_inactivity_check(2, 1)
IRedisMock.pause # no responses from now on

EM.add_timer(1.5) {
redis.get "test"
}

EM.add_timer(5) {
IRedisMock.received.should include("disconnect")
done
}
end
end

end
83 changes: 83 additions & 0 deletions spec/support/inprocess_redis_mock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
module IRedisMock
def self.start(replies = {})
@sig = EventMachine::start_server("127.0.0.1", 6381, Connection)
@received = []
@replies = replies
@paused = false
end

def self.stop
EventMachine::stop_server(@sig)
end

def self.received
@received ||= []
end

def self.pause
@paused = true
end
def self.unpause
@paused = false
end

def self.paused
@paused
end

def self.replies
@replies
end

class Connection < EventMachine::Connection
def initialize
@data = ""
@parts = []
end

def unbind
IRedisMock.received << 'disconnect'
end

def receive_data(data)
@data << data

while (idx = @data.index("\r\n"))
@parts << @data[0..idx-1]
@data = @data[idx+2..-1]
end

while @parts.length > 0
throw "commands out of sync" unless @parts[0][0] == '*'

num_parts = @parts[0][1..-1].to_i * 2 + 1
return if @parts.length < num_parts

command_parts = @parts[0..num_parts]
@parts = @parts[num_parts..-1]

# Discard length declarations
command_line =
command_parts
.reject { |p| p[0] == '*' || p[0] == '$' }
.join ' '

if IRedisMock.replies.member?(command_line)
reply = IRedisMock.replies[command_line]
else
reply = "+OK"
end

# p "[#{command_line}] => [#{reply}]"

IRedisMock.received << command_line

if IRedisMock.paused
# puts "Paused, therefore not sending [#{reply}]"
else
send_data "#{reply}\r\n"
end
end
end
end
end

0 comments on commit 283f664

Please sign in to comment.