Skip to content

Add Redis Cluster support #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Gemfile.lock
/tmp/
/.idea
/.yardoc
/.bundle
/coverage/*
/doc/
/examples/sentinel/sentinel.conf
Expand All @@ -14,3 +15,5 @@ Gemfile.lock
/redis/*
/test/db
/test/test.conf
appendonly.aof
temp-rewriteaof-*.aof
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ before_install:
- gem update --system 2.6.14
- gem --version

script: make test
script: make

rvm:
- 2.2.2
Expand All @@ -25,7 +25,7 @@ before_script:
env:
global:
- VERBOSE=true
- TIMEOUT=1
- TIMEOUT=9
matrix:
- DRIVER=ruby REDIS_BRANCH=3.0
- DRIVER=ruby REDIS_BRANCH=3.2
Expand Down
99 changes: 94 additions & 5 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ def self.current=(redis)
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
# @option options [Array] :sentinels List of sentinels to contact
# @option options [Symbol] :role (:master) Role to fetch via Sentinel, either `:master` or `:slave`
# @option options [Array<String, Hash{Symbol => String, Integer}>] :cluster List of cluster nodes to contact
# @option options [Boolean] :replica Whether to use readonly replica nodes in Redis Cluster or not
# @option options [Class] :connector Class of custom connector
#
# @return [Redis] a new client instance
def initialize(options = {})
@options = options.dup
@original_client = @client = Client.new(options)
@cluster_mode = options.key?(:cluster)
client = @cluster_mode ? Cluster : Client
@original_client = @client = client.new(options)
@queue = Hash.new { |h, k| h[k] = [] }

super() # Monitor#initialize
Expand Down Expand Up @@ -274,9 +278,7 @@ def info(cmd = nil)
synchronize do |client|
client.call([:info, cmd].compact) do |reply|
if reply.kind_of?(String)
reply = Hash[reply.split("\r\n").map do |line|
line.split(":", 2) unless line =~ /^(#|$)/
end.compact]
reply = HashifyInfo.call(reply)

if cmd && cmd.to_s == "commandstats"
# Extract nested hashes for INFO COMMANDSTATS
Expand Down Expand Up @@ -2818,6 +2820,41 @@ def sentinel(subcommand, *args)
end
end

# Sends `CLUSTER *` command to random node and returns its reply.
#
# @see https://redis.io/commands#cluster Reference of cluster command
#
# @param subcommand [String, Symbol] the subcommand of cluster command
# e.g. `:slots`, `:nodes`, `:slaves`, `:info`
#
# @return [Object] depends on the subcommand
def cluster(subcommand, *args)
subcommand = subcommand.to_s.downcase
block = case subcommand
when 'slots' then HashifyClusterSlots
when 'nodes' then HashifyClusterNodes
when 'slaves' then HashifyClusterSlaves
when 'info' then HashifyInfo
else Noop
end

# @see https://github.com/antirez/redis/blob/unstable/src/redis-trib.rb#L127 raw reply expected
block = Noop unless @cluster_mode

synchronize do |client|
client.call([:cluster, subcommand] + args, &block)
end
end

# Sends `ASKING` command to random node and returns its reply.
#
# @see https://redis.io/topics/cluster-spec#ask-redirection ASK redirection
#
# @return [String] `'OK'`
def asking
synchronize { |client| client.call(%i[asking]) }
end

def id
@original_client.id
end
Expand All @@ -2831,6 +2868,8 @@ def dup
end

def connection
return @original_client.connection_info if @cluster_mode

{
host: @original_client.host,
port: @original_client.port,
Expand Down Expand Up @@ -2896,6 +2935,56 @@ def method_missing(command, *args)
end
}

HashifyInfo =
lambda { |reply|
Hash[reply.split("\r\n").map do |line|
line.split(':', 2) unless line =~ /^(#|$)/
end.compact]
}

HashifyClusterNodeInfo =
lambda { |str|
arr = str.split(' ')
{
'node_id' => arr[0],
'ip_port' => arr[1],
'flags' => arr[2].split(','),
'master_node_id' => arr[3],
'ping_sent' => arr[4],
'pong_recv' => arr[5],
'config_epoch' => arr[6],
'link_state' => arr[7],
'slots' => arr[8].nil? ? nil : Range.new(*arr[8].split('-'))
}
}

HashifyClusterSlots =
lambda { |reply|
reply.map do |arr|
first_slot, last_slot = arr[0..1]
master = { 'ip' => arr[2][0], 'port' => arr[2][1], 'node_id' => arr[2][2] }
replicas = arr[3..-1].map { |r| { 'ip' => r[0], 'port' => r[1], 'node_id' => r[2] } }
{
'start_slot' => first_slot,
'end_slot' => last_slot,
'master' => master,
'replicas' => replicas
}
end
}

HashifyClusterNodes =
lambda { |reply|
reply.split(/[\r\n]+/).map { |str| HashifyClusterNodeInfo.call(str) }
}

HashifyClusterSlaves =
lambda { |reply|
reply.map { |str| HashifyClusterNodeInfo.call(str) }
}

Noop = ->(reply) { reply }

def _geoarguments(*args, options: nil, sort: nil, count: nil)
args.push sort if sort
args.push 'count', count if count
Expand All @@ -2918,11 +3007,11 @@ def _subscription(method, timeout, channels, block)
@client = original
end
end

end

require_relative "redis/version"
require_relative "redis/connection"
require_relative "redis/client"
require_relative "redis/cluster"
require_relative "redis/pipeline"
require_relative "redis/subscribe"
Loading