forked from discourse/discourse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs.rb
176 lines (149 loc) · 5 KB
/
jobs.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get('last_job_perform_at')
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.select { |job| job.klass =~ /Email$/ }.size
end
class Base
include Sidekiq::Worker
def log(*args)
puts args
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
def self.delayed_perform(opts={})
self.new.perform(opts)
end
def execute(opts={})
raise "Overwrite me!"
end
def perform(*args)
opts = args.extract_options!.with_indifferent_access
if SiteSetting.queue_jobs?
Sidekiq.redis do |r|
r.set('last_job_perform_at', Time.now.to_i)
end
end
if opts.delete(:sync_exec)
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
else
return execute(opts)
end
end
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
dbs.each do |db|
begin
thread_exception = nil
# NOTE: This looks odd, in fact it looks crazy but there is a reason
# A bug in therubyracer means that under certain conditions running in a fiber
# can cause the whole v8 context to corrupt so much that it will hang sidekiq
#
# If you are brave and want to try to fix this either in celluloid or therubyracer, the repro is:
#
# 1. Create a big Discourse db: (you can start from script/profile_db_generator.rb)
# 2. Queue a ton of jobs, eg: User.pluck(:id).each{|id| Jobs.enqueue(:user_email, type: :digest, user_id: id)};
# 3. Run sidekiq
#
# The issue only happens in Ruby 2.0 for some reason, you start getting V8::Error with no context
#
# See: https://github.com/cowboyd/therubyracer/issues/206
#
# The restricted stack space of fibers opens a bunch of risks up, by avoiding them altogether
# we can mitigate giving up a very marginal amount of throughput
#
# Ideally we could just tell sidekiq to avoid fibers
t = Thread.new do
begin
RailsMultisite::ConnectionManagement.establish_connection(db: db)
I18n.locale = SiteSetting.default_locale
execute(opts)
rescue => e
thread_exception = e
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
t.join
raise thread_exception if thread_exception
end
end
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
class Scheduled < Base
include Sidetiq::Schedulable
end
def self.enqueue(job_name, opts={})
klass = "Jobs::#{job_name.to_s.camelcase}".constantize
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
# If we are able to queue a job, do it
if SiteSetting.queue_jobs?
if opts[:delay_for].present?
klass.delay_for(opts.delete(:delay_for)).delayed_perform(opts)
else
Sidekiq::Client.enqueue(klass, opts)
end
else
# Otherwise execute the job right away
opts.delete(:delay_for)
opts[:sync_exec] = true
klass.new.perform(opts)
end
end
def self.enqueue_in(secs, job_name, opts={})
enqueue(job_name, opts.merge!(delay_for: secs))
end
def self.enqueue_at(datetime, job_name, opts={})
enqueue_in( [(datetime - Time.zone.now).to_i, 0].max, job_name, opts )
end
def self.cancel_scheduled_job(job_name, params={})
jobs = scheduled_for(job_name, params)
return false if jobs.empty?
jobs.each { |job| job.delete }
true
end
def self.scheduled_for(job_name, params={})
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass == 'Sidekiq::Extensions::DelayedClass'
job_args = YAML.load(scheduled_job.args[0])
job_args_class, _, (job_args_params, *) = job_args
if job_args_class.to_s == job_class && job_args_params
matched = true
params.each do |key, value|
unless job_args_params[key] == value
matched = false
break
end
end
matched
else
false
end
else
false
end
end
end
end
# Require all jobs
Dir["#{Rails.root}/lib/jobs/*"].each {|file| require_dependency file }