@@ -31,12 +31,16 @@ def self.current=(redis)
31
31
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
32
32
# @option options [Array] :sentinels List of sentinels to contact
33
33
# @option options [Symbol] :role (:master) Role to fetch via Sentinel, either `:master` or `:slave`
34
+ # @option options [Array<String, Hash{Symbol => String, Integer}>] :cluster List of cluster nodes to contact
35
+ # @option options [Boolean] :replica Whether to use readonly replica nodes in Redis Cluster or not
34
36
# @option options [Class] :connector Class of custom connector
35
37
#
36
38
# @return [Redis] a new client instance
37
39
def initialize ( options = { } )
38
40
@options = options . dup
39
- @original_client = @client = Client . new ( options )
41
+ @cluster_mode = options . key? ( :cluster )
42
+ client = @cluster_mode ? Cluster : Client
43
+ @original_client = @client = client . new ( options )
40
44
@queue = Hash . new { |h , k | h [ k ] = [ ] }
41
45
42
46
super ( ) # Monitor#initialize
@@ -274,9 +278,7 @@ def info(cmd = nil)
274
278
synchronize do |client |
275
279
client . call ( [ :info , cmd ] . compact ) do |reply |
276
280
if reply . kind_of? ( String )
277
- reply = Hash [ reply . split ( "\r \n " ) . map do |line |
278
- line . split ( ":" , 2 ) unless line =~ /^(#|$)/
279
- end . compact ]
281
+ reply = HashifyInfo . call ( reply )
280
282
281
283
if cmd && cmd . to_s == "commandstats"
282
284
# Extract nested hashes for INFO COMMANDSTATS
@@ -2818,6 +2820,41 @@ def sentinel(subcommand, *args)
2818
2820
end
2819
2821
end
2820
2822
2823
+ # Sends `CLUSTER *` command to random node and returns its reply.
2824
+ #
2825
+ # @see https://redis.io/commands#cluster Reference of cluster command
2826
+ #
2827
+ # @param subcommand [String, Symbol] the subcommand of cluster command
2828
+ # e.g. `:slots`, `:nodes`, `:slaves`, `:info`
2829
+ #
2830
+ # @return [Object] depends on the subcommand
2831
+ def cluster ( subcommand , *args )
2832
+ subcommand = subcommand . to_s . downcase
2833
+ block = case subcommand
2834
+ when 'slots' then HashifyClusterSlots
2835
+ when 'nodes' then HashifyClusterNodes
2836
+ when 'slaves' then HashifyClusterSlaves
2837
+ when 'info' then HashifyInfo
2838
+ else Noop
2839
+ end
2840
+
2841
+ # @see https://github.com/antirez/redis/blob/unstable/src/redis-trib.rb#L127 raw reply expected
2842
+ block = Noop unless @cluster_mode
2843
+
2844
+ synchronize do |client |
2845
+ client . call ( [ :cluster , subcommand ] + args , &block )
2846
+ end
2847
+ end
2848
+
2849
+ # Sends `ASKING` command to random node and returns its reply.
2850
+ #
2851
+ # @see https://redis.io/topics/cluster-spec#ask-redirection ASK redirection
2852
+ #
2853
+ # @return [String] `'OK'`
2854
+ def asking
2855
+ synchronize { |client | client . call ( %i[ asking ] ) }
2856
+ end
2857
+
2821
2858
def id
2822
2859
@original_client . id
2823
2860
end
@@ -2831,6 +2868,8 @@ def dup
2831
2868
end
2832
2869
2833
2870
def connection
2871
+ return @original_client . connection_info if @cluster_mode
2872
+
2834
2873
{
2835
2874
host : @original_client . host ,
2836
2875
port : @original_client . port ,
@@ -2896,6 +2935,56 @@ def method_missing(command, *args)
2896
2935
end
2897
2936
}
2898
2937
2938
+ HashifyInfo =
2939
+ lambda { |reply |
2940
+ Hash [ reply . split ( "\r \n " ) . map do |line |
2941
+ line . split ( ':' , 2 ) unless line =~ /^(#|$)/
2942
+ end . compact ]
2943
+ }
2944
+
2945
+ HashifyClusterNodeInfo =
2946
+ lambda { |str |
2947
+ arr = str . split ( ' ' )
2948
+ {
2949
+ 'node_id' => arr [ 0 ] ,
2950
+ 'ip_port' => arr [ 1 ] ,
2951
+ 'flags' => arr [ 2 ] . split ( ',' ) ,
2952
+ 'master_node_id' => arr [ 3 ] ,
2953
+ 'ping_sent' => arr [ 4 ] ,
2954
+ 'pong_recv' => arr [ 5 ] ,
2955
+ 'config_epoch' => arr [ 6 ] ,
2956
+ 'link_state' => arr [ 7 ] ,
2957
+ 'slots' => arr [ 8 ] . nil? ? nil : Range . new ( *arr [ 8 ] . split ( '-' ) )
2958
+ }
2959
+ }
2960
+
2961
+ HashifyClusterSlots =
2962
+ lambda { |reply |
2963
+ reply . map do |arr |
2964
+ first_slot , last_slot = arr [ 0 ..1 ]
2965
+ master = { 'ip' => arr [ 2 ] [ 0 ] , 'port' => arr [ 2 ] [ 1 ] , 'node_id' => arr [ 2 ] [ 2 ] }
2966
+ replicas = arr [ 3 ..-1 ] . map { |r | { 'ip' => r [ 0 ] , 'port' => r [ 1 ] , 'node_id' => r [ 2 ] } }
2967
+ {
2968
+ 'start_slot' => first_slot ,
2969
+ 'end_slot' => last_slot ,
2970
+ 'master' => master ,
2971
+ 'replicas' => replicas
2972
+ }
2973
+ end
2974
+ }
2975
+
2976
+ HashifyClusterNodes =
2977
+ lambda { |reply |
2978
+ reply . split ( /[\r \n ]+/ ) . map { |str | HashifyClusterNodeInfo . call ( str ) }
2979
+ }
2980
+
2981
+ HashifyClusterSlaves =
2982
+ lambda { |reply |
2983
+ reply . map { |str | HashifyClusterNodeInfo . call ( str ) }
2984
+ }
2985
+
2986
+ Noop = -> ( reply ) { reply }
2987
+
2899
2988
def _geoarguments ( *args , options : nil , sort : nil , count : nil )
2900
2989
args . push sort if sort
2901
2990
args . push 'count' , count if count
@@ -2918,11 +3007,11 @@ def _subscription(method, timeout, channels, block)
2918
3007
@client = original
2919
3008
end
2920
3009
end
2921
-
2922
3010
end
2923
3011
2924
3012
require_relative "redis/version"
2925
3013
require_relative "redis/connection"
2926
3014
require_relative "redis/client"
3015
+ require_relative "redis/cluster"
2927
3016
require_relative "redis/pipeline"
2928
3017
require_relative "redis/subscribe"
0 commit comments