forked from discourse/discourse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiscourse_redis.rb
306 lines (253 loc) · 7.93 KB
/
discourse_redis.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# frozen_string_literal: true
#
# A wrapper around redis that namespaces keys with the current site id
#
class DiscourseRedis
class FallbackHandler
include Singleton
MASTER_ROLE_STATUS = "role:master".freeze
MASTER_LOADING_STATUS = "loading:1".freeze
MASTER_LOADED_STATUS = "loading:0".freeze
CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze)
def initialize
@master = true
@running = false
@mutex = Mutex.new
@slave_config = DiscourseRedis.slave_config
@message_bus_keepalive_interval = MessageBus.keepalive_interval
end
def verify_master
synchronize do
return if @thread && @thread.alive?
@thread = Thread.new do
loop do
begin
thread = Thread.new { initiate_fallback_to_master }
thread.join
break if synchronize { @master }
sleep 5
ensure
thread.kill
end
end
end
end
end
def initiate_fallback_to_master
success = false
begin
redis_config = DiscourseRedis.config.dup
redis_config.delete(:connector)
master_client = ::Redis::Client.new(redis_config)
logger.warn "#{log_prefix}: Checking connection to master server..."
info = master_client.call([:info])
if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
begin
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
self.master = true
slave_client = ::Redis::Client.new(@slave_config)
CONNECTION_TYPES.each do |connection_type|
slave_client.call([:client, [:kill, 'type', connection_type]])
end
MessageBus.keepalive_interval = @message_bus_keepalive_interval
Discourse.clear_readonly!
Discourse.request_refresh!
success = true
ensure
slave_client&.disconnect
end
end
rescue => e
logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
ensure
master_client&.disconnect
end
success
end
def master
synchronize { @master }
end
def master=(args)
synchronize do
@master = args
# Disables MessageBus keepalive when Redis is in readonly mode
MessageBus.keepalive_interval = 0 if !@master
end
end
private
def synchronize
@mutex.synchronize { yield }
end
def logger
Rails.logger
end
def log_prefix
"#{self.class}"
end
end
class Connector < Redis::Client::Connector
def initialize(options)
super(options)
@slave_options = DiscourseRedis.slave_config(options)
@fallback_handler = DiscourseRedis::FallbackHandler.instance
end
def resolve(client = nil)
if !@fallback_handler.master
@fallback_handler.verify_master
return @slave_options
end
begin
options = @options.dup
options.delete(:connector)
client ||= Redis::Client.new(options)
loading = client.call([:info, :persistence]).include?(
DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS
)
loading ? @slave_options : @options
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
@fallback_handler.master = false
@fallback_handler.verify_master
raise ex
ensure
client.disconnect
end
end
end
def self.raw_connection(config = nil)
config ||= self.config
Redis.new(config)
end
def self.config
GlobalSetting.redis_config
end
def self.slave_config(options = config)
options.dup.merge!(host: options[:slave_host], port: options[:slave_port])
end
def initialize(config = nil, namespace: true)
@config = config || DiscourseRedis.config
@redis = DiscourseRedis.raw_connection(@config.dup)
@namespace = namespace
end
def self.fallback_handler
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
end
def without_namespace
# Only use this if you want to store and fetch data that's shared between sites
@redis
end
def self.ignore_readonly
yield
rescue Redis::CommandError => ex
if ex.message =~ /READONLY/
unless Discourse.recently_readonly? || Rails.env.test?
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
end
fallback_handler.verify_master if !fallback_handler.master
Discourse.received_redis_readonly!
nil
else
raise ex
end
end
# prefix the key with the namespace
def method_missing(meth, *args, &block)
if @redis.respond_to?(meth)
DiscourseRedis.ignore_readonly { @redis.public_send(meth, *args, &block) }
else
super
end
end
# Proxy key methods through, but prefix the keys with the namespace
[:append, :blpop, :brpop, :brpoplpush, :decr, :decrby, :exists, :expire, :expireat, :get, :getbit, :getrange, :getset,
:hdel, :hexists, :hget, :hgetall, :hincrby, :hincrbyfloat, :hkeys, :hlen, :hmget, :hmset, :hset, :hsetnx, :hvals, :incr,
:incrby, :incrbyfloat, :lindex, :linsert, :llen, :lpop, :lpush, :lpushx, :lrange, :lrem, :lset, :ltrim,
:mapped_hmset, :mapped_hmget, :mapped_mget, :mapped_mset, :mapped_msetnx, :move, :mset,
:msetnx, :persist, :pexpire, :pexpireat, :psetex, :pttl, :rename, :renamenx, :rpop, :rpoplpush, :rpush, :rpushx, :sadd, :scard,
:sdiff, :set, :setbit, :setex, :setnx, :setrange, :sinter, :sismember, :smembers, :sort, :spop, :srandmember, :srem, :strlen,
:sunion, :ttl, :type, :watch, :zadd, :zcard, :zcount, :zincrby, :zrange, :zrangebyscore, :zrank, :zrem, :zremrangebyrank,
:zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore ].each do |m|
define_method m do |*args|
args[0] = "#{namespace}:#{args[0]}" if @namespace
DiscourseRedis.ignore_readonly { @redis.public_send(m, *args) }
end
end
def mget(*args)
args.map! { |a| "#{namespace}:#{a}" } if @namespace
DiscourseRedis.ignore_readonly { @redis.mget(*args) }
end
def del(k)
DiscourseRedis.ignore_readonly do
k = "#{namespace}:#{k}" if @namespace
@redis.del k
end
end
def scan_each(options = {}, &block)
DiscourseRedis.ignore_readonly do
match = options[:match].presence || '*'
options[:match] =
if @namespace
"#{namespace}:#{match}"
else
match
end
if block
@redis.scan_each(options) do |key|
key = remove_namespace(key) if @namespace
block.call(key)
end
else
@redis.scan_each(options).map do |key|
key = remove_namespace(key) if @namespace
key
end
end
end
end
def keys(pattern = nil)
DiscourseRedis.ignore_readonly do
pattern = pattern || '*'
pattern = "#{namespace}:#{pattern}" if @namespace
keys = @redis.keys(pattern)
if @namespace
len = namespace.length + 1
keys.map! { |k| k[len..-1] }
end
keys
end
end
def delete_prefixed(prefix)
DiscourseRedis.ignore_readonly do
keys("#{prefix}*").each { |k| Discourse.redis.del(k) }
end
end
def flushdb
DiscourseRedis.ignore_readonly do
keys.each { |k| del(k) }
end
end
def reconnect
@redis._client.reconnect
end
def namespace_key(key)
if @namespace
"#{namespace}:#{key}"
else
key
end
end
def namespace
RailsMultisite::ConnectionManagement.current_db
end
def self.namespace
Rails.logger.warn("DiscourseRedis.namespace is going to be deprecated, do not use it!")
RailsMultisite::ConnectionManagement.current_db
end
def self.new_redis_store
Cache.new
end
private
def remove_namespace(key)
key[(namespace.length + 1)..-1]
end
end