Skip to content

Commit

Permalink
PERF: perform all cached counting in background (discourse#15991)
Browse files Browse the repository at this point in the history
Previously cached counting made redis calls in main thread and performed
the flush in main thread.

This could lead to pathological states in extreme heavy load.

This refactor reduces load and cleans up the interface
  • Loading branch information
SamSaffron authored Feb 22, 2022
1 parent 98a7fa3 commit d4d3580
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 357 deletions.
48 changes: 5 additions & 43 deletions app/models/application_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,16 @@ def self.enable
@disabled = false
end

def self.increment!(type, opts = nil)
def self.increment!(req_type)
return if @disabled
perform_increment!(redis_key(type), opts)
perform_increment!(req_type)
end

def self.write_cache!(date = nil)
if date.nil?
write_cache!(Time.now.utc)
write_cache!(Time.now.utc.yesterday)
return
end

self.last_flush = Time.now.utc

date = date.to_date

req_types.each do |req_type, _|
val = get_and_reset(redis_key(req_type, date))

next if val == 0

id = req_id(date, req_type)
where(id: id).update_all(["count = count + ?", val])
end
rescue Redis::CommandError => e
raise unless e.message =~ /READONLY/
nil
def self.write_cache!(req_type, count, date)
id = req_id(date, req_type)
where(id: id).update_all(["count = count + ?", count])
end

def self.clear_cache!(date = nil)
if date.nil?
clear_cache!(Time.now.utc)
clear_cache!(Time.now.utc.yesterday)
return
end

req_types.each do |req_type, _|
key = redis_key(req_type, date)
Discourse.redis.del key
end
end

protected

def self.req_id(date, req_type, retries = 0)

req_type_id = req_types[req_type]
Expand All @@ -83,10 +49,6 @@ def self.req_id(date, req_type, retries = 0)
end
end

def self.redis_key(req_type, time = Time.now.utc)
"app_req_#{req_type}#{time.strftime('%Y%m%d')}"
end

def self.stats
s = HashWithIndifferentAccess.new({})

Expand Down
196 changes: 150 additions & 46 deletions app/models/concerns/cached_counting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,179 @@
module CachedCounting
extend ActiveSupport::Concern

EXPIRE_CACHE_AFTER = 4.days.to_i

LUA_INCR_AND_EXPIRE = DiscourseRedis::EvalHelper.new <<~LUA
local result = redis.call("INCR", KEYS[1])
if result == 1 then
redis.call("EXPIRE", KEYS[1], ARGV[1])
end
LUA_HGET_DEL = DiscourseRedis::EvalHelper.new <<~LUA
local result = redis.call("HGET", KEYS[1], KEYS[2])
redis.call("HDEL", KEYS[1], KEYS[2])
return result
LUA

included do
class << self
attr_accessor :autoflush, :autoflush_seconds, :last_flush
QUEUE = Queue.new
SLEEP_SECONDS = 1
FLUSH_DB_ITERATIONS = 60
MUTEX = Mutex.new

def self.disable
@enabled = false
if @thread && @thread.alive?
@thread.wakeup
@thread.join
end
end

# auto flush if backlog is larger than this
self.autoflush = 2000
def self.enabled?
@enabled != false
end

# auto flush if older than this
self.autoflush_seconds = 5.minutes
def self.enable
@enabled = true
end

self.last_flush = Time.now.utc
def self.reset
@last_ensure_thread = nil
clear_queue!
clear_flush_to_db_lock!
end

class_methods do
def perform_increment!(key, opts = nil)
val = DiscourseRedis.ignore_readonly do
LUA_INCR_AND_EXPIRE.eval(
Discourse.redis.without_namespace,
[Discourse.redis.namespace_key(key)],
[EXPIRE_CACHE_AFTER]
).to_i
end
ENSURE_THREAD_COOLDOWN_SECONDS = 5

# readonly mode it is going to be nil, skip
return if val.nil?
def self.ensure_thread!
return if !enabled?

autoflush = (opts && opts[:autoflush]) || self.autoflush
if autoflush > 0 && val >= autoflush
write_cache!
MUTEX.synchronize do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
delta = @last_ensure_thread && (now - @last_ensure_thread)

if delta && delta < ENSURE_THREAD_COOLDOWN_SECONDS
# creating threads can be very expensive and bog down a process
return
end

if (Time.now.utc - last_flush).to_i > autoflush_seconds
write_cache!
@last_ensure_thread = now

if !@thread&.alive?
@thread = nil
end
@thread ||= Thread.new { thread_loop }
end
end

def self.thread_loop
iterations = 0
while true
break if !enabled?

sleep SLEEP_SECONDS
flush_in_memory
if (iterations >= FLUSH_DB_ITERATIONS) || @flush
iterations = 0
flush_to_db
@flush = false
end
iterations += 1
end

def write_cache!(date = nil)
raise NotImplementedError
rescue => ex
if Redis::CommandError === ex && ex.message =~ /READONLY/
# do not warn for Redis readonly mode
elsif PG::ReadOnlySqlTransaction === ex
# do not warn for PG readonly mode
else
Discourse.warn_exception(
ex,
message: 'Unexpected error while processing cached counts'
)
end
end

# this may seem a bit fancy but in so it allows
# for concurrent calls without double counting
def get_and_reset(key)
Discourse.redis.set(key, '0', ex: EXPIRE_CACHE_AFTER, get: true).to_i
def self.flush
@flush = true
@thread.wakeup
while @flush
sleep 0.001
end
end

COUNTER_REDIS_HASH = "CounterCacheHash"

def request_id(query_params, retries = 0)
id = where(query_params).pluck_first(:id)
id ||= create!(query_params.merge(count: 0)).id
rescue # primary key violation
if retries == 0
request_id(query_params, 1)
else
raise
def self.flush_in_memory
counts = nil
while QUEUE.length > 0
# only 1 consumer, no need to avoid blocking
key, klass, db, time = QUEUE.deq
_redis_key = "#{klass},#{db},#{time.strftime("%Y%m%d")},#{key}"
counts ||= Hash.new(0)
counts[_redis_key] += 1
end

if counts
counts.each do |redis_key, count|
Discourse.redis.without_namespace.hincrby(COUNTER_REDIS_HASH, redis_key, count)
end
end
end

DB_FLUSH_COOLDOWN_SECONDS = 60
DB_COOLDOWN_KEY = "cached_counting_cooldown"

def self.flush_to_db
redis = Discourse.redis.without_namespace
DistributedMutex.synchronize("flush_counters_to_db", redis: redis, validity: 5.minutes) do
if allowed_to_flush_to_db?
redis.hkeys(COUNTER_REDIS_HASH).each do |key|

val = LUA_HGET_DEL.eval(
redis,
[COUNTER_REDIS_HASH, key]
).to_i

# unlikely (protected by mutex), but protect just in case
# could be a race condition in test
if val > 0
klass_name, db, date, local_key = key.split(",", 4)
date = Date.strptime(date, "%Y%m%d")
klass = Module.const_get(klass_name)

RailsMultisite::ConnectionManagement.with_connection(db) do
klass.write_cache!(local_key, val, date)
end
end
end
end
end
end

def self.clear_flush_to_db_lock!
Discourse.redis.without_namespace.del(DB_COOLDOWN_KEY)
end

def self.flush_to_db_lock_ttl
Discourse.redis.without_namespace.ttl(DB_COOLDOWN_KEY)
end

def self.allowed_to_flush_to_db?
Discourse.redis.without_namespace.set(DB_COOLDOWN_KEY, "1", ex: DB_FLUSH_COOLDOWN_SECONDS, nx: true)
end

def self.queue(key, klass)
QUEUE.push([key, klass, RailsMultisite::ConnectionManagement.current_db, Time.now.utc])
end

def self.clear_queue!
QUEUE.clear
redis = Discourse.redis.without_namespace
redis.del(COUNTER_REDIS_HASH)
end

class_methods do
def perform_increment!(key)
CachedCounting.ensure_thread!
CachedCounting.queue(key, self)
end

def write_cache!(key, count, date)
raise NotImplementedError
end

end
end
70 changes: 14 additions & 56 deletions app/models/web_crawler_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
class WebCrawlerRequest < ActiveRecord::Base
include CachedCounting

# auto flush if older than this
self.autoflush_seconds = 1.hour

cattr_accessor :max_record_age, :max_records_per_day

# only keep the top records based on request count
Expand All @@ -14,67 +11,28 @@ class WebCrawlerRequest < ActiveRecord::Base
# delete records older than this
self.max_record_age = 30.days

def self.increment!(user_agent, opts = nil)
ua_list_key = user_agent_list_key
Discourse.redis.sadd(ua_list_key, user_agent)
Discourse.redis.expire(ua_list_key, 259200) # 3.days

perform_increment!(redis_key(user_agent), opts)
end

def self.write_cache!(date = nil)
if date.nil?
write_cache!(Time.now.utc)
write_cache!(Time.now.utc.yesterday)
return
end

self.last_flush = Time.now.utc

date = date.to_date
ua_list_key = user_agent_list_key(date)

while user_agent = Discourse.redis.spop(ua_list_key)
val = get_and_reset(redis_key(user_agent, date))

next if val == 0

self.where(id: req_id(date, user_agent)).update_all(["count = count + ?", val])
end
rescue Redis::CommandError => e
raise unless e.message =~ /READONLY/
nil
def self.increment!(user_agent)
perform_increment!(user_agent)
end

def self.clear_cache!(date = nil)
if date.nil?
clear_cache!(Time.now.utc)
clear_cache!(Time.now.utc.yesterday)
return
end

ua_list_key = user_agent_list_key(date)

while user_agent = Discourse.redis.spop(ua_list_key)
Discourse.redis.del redis_key(user_agent, date)
end

Discourse.redis.del(ua_list_key)
def self.write_cache!(user_agent, count, date)
where(id: request_id(date: date, user_agent: user_agent))
.update_all(["count = count + ?", count])
end

protected

def self.user_agent_list_key(time = Time.now.utc)
"crawl_ua_list:#{time.strftime('%Y%m%d')}"
end

def self.redis_key(user_agent, time = Time.now.utc)
"crawl_req:#{time.strftime('%Y%m%d')}:#{user_agent}"
def self.request_id(date:, user_agent:, retries: 0)
id = where(date: date, user_agent: user_agent).pluck_first(:id)
id ||= create!({ date: date, user_agent: user_agent }.merge(count: 0)).id
rescue # primary key violation
if retries == 0
request_id(date: date, user_agent: user_agent, retries: 1)
else
raise
end
end

def self.req_id(date, user_agent)
request_id(date: date, user_agent: user_agent)
end
end

# == Schema Information
Expand Down
Loading

0 comments on commit d4d3580

Please sign in to comment.