Skip to content

Commit ffe6d9b

Browse files
committed
WIP
1 parent 8fc8504 commit ffe6d9b

File tree

7 files changed

+146
-31
lines changed

7 files changed

+146
-31
lines changed

lib/job-iteration.rb

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "active_job"
44
require_relative "./job-iteration/version"
55
require_relative "./job-iteration/enumerator_builder"
6+
require_relative "./job-iteration/interruption_adapters"
67
require_relative "./job-iteration/iteration"
78
require_relative "./job-iteration/log_subscriber"
89
require_relative "./job-iteration/railtie"
@@ -60,10 +61,21 @@ def logger
6061
# where the throttle backoff value will take precedence over this setting.
6162
attr_accessor :default_retry_backoff
6263

63-
# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
64-
attr_accessor :interruption_adapter
64+
# DEPRECATED - Overrides interruption checks based on queue adapter.
65+
# To customize the interruption checks, use register_interruption_adapter(:my_queue_adapter_name) instead.
66+
attr_reader :interruption_adapter
6567

66-
self.interruption_adapter = -> { false }
68+
def interruption_adapter=(adapter)
69+
Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated."\
70+
" Use JobIteration.register_interruption_adapter(:foo, callable) instead"\
71+
" to register the callable (a proc, method, or other object responding to #call)"\
72+
" as the interruption adapter for queue adapter :foo.")
73+
@interruption_adapter = adapter
74+
end
75+
76+
def register_interruption_adapter(adapter_name, adapter)
77+
InterruptionAdapters.register(adapter_name, adapter)
78+
end
6779

6880
# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
6981
# @example
@@ -76,29 +88,4 @@ def logger
7688
attr_accessor :enumerator_builder
7789

7890
self.enumerator_builder = JobIteration::EnumeratorBuilder
79-
80-
def load_integrations
81-
loaded = nil
82-
INTEGRATIONS.each do |integration|
83-
load_integration(integration)
84-
if loaded
85-
raise IntegrationLoadError,
86-
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
87-
"Iteration will only work with one integration."
88-
end
89-
loaded = integration
90-
rescue LoadError
91-
end
92-
end
93-
94-
def load_integration(integration)
95-
unless INTEGRATIONS.include?(integration)
96-
raise IntegrationLoadError,
97-
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
98-
end
99-
100-
require_relative "./job-iteration/integrations/#{integration}"
101-
end
10291
end
103-
104-
JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# frozen_string_literal: true
2+
3+
require_relative "interruption_adapters/null_adapter"
4+
5+
module JobIteration
6+
module InterruptionAdapters
7+
class << self
8+
# Returns adapter for specified name.
9+
#
10+
# JobIteration::InterruptionAdapters.lookup(:sidekiq)
11+
# # => JobIteration::InterruptionAdapters::SidekiqAdapter
12+
def lookup(name)
13+
registry.fetch(name.to_sym) do
14+
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1))
15+
No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts.
16+
Use `JobIteration.register_queue_adapter(#{name.to_sym.inspect}, <adapter>) to register one.
17+
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
18+
DEPRECATION_MESSAGE
19+
20+
NullAdapter
21+
end
22+
end
23+
24+
# Registers adapter for specified name.
25+
#
26+
# JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter)
27+
def register(name, adapter)
28+
raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call)
29+
30+
registry[name.to_sym] = adapter
31+
end
32+
33+
private
34+
35+
attr_reader :registry
36+
end
37+
38+
@registry = {}
39+
40+
# Built-in Rails adapters. It doesn't make sense to interrupt for these.
41+
register(:async, NullAdapter)
42+
register(:inline, NullAdapter)
43+
register(:test, NullAdapter)
44+
45+
# External adapters
46+
begin
47+
require "resque"
48+
require_relative "interruption_adapters/resque_adapter"
49+
register(:resque, ResqueAdapter)
50+
rescue LoadError
51+
# Resque is not available, no need to load the adapter
52+
end
53+
54+
begin
55+
require "sidekiq"
56+
require_relative "interruption_adapters/sidekiq_adapter"
57+
register(:sidekiq, SidekiqAdapter)
58+
rescue LoadError
59+
# Sidekiq is not available, no need to load the adapter
60+
end
61+
end
62+
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# frozen_string_literal: true
2+
3+
# This adapter never interrupts.
4+
module JobIteration
5+
module InterruptionAdapters
6+
module NullAdapter
7+
class << self
8+
def call
9+
false
10+
end
11+
end
12+
end
13+
end
14+
end
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# frozen_string_literal: true
2+
3+
module JobIteration
4+
module InterruptionAdapters
5+
module ResqueAdapter
6+
# @private
7+
module IterationExtension
8+
def initialize(*)
9+
$resque_worker = self # rubocop:disable Style/GlobalVars
10+
super
11+
end
12+
end
13+
14+
# @private
15+
module ::Resque
16+
class Worker
17+
# The patch is required in order to call shutdown? on a Resque::Worker instance
18+
prepend(IterationExtension)
19+
end
20+
end
21+
22+
class << self
23+
def call
24+
$resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars
25+
end
26+
end
27+
end
28+
end
29+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
module JobIteration
4+
module InterruptionAdapters
5+
module SidekiqAdapter
6+
class << self
7+
def call
8+
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
9+
Sidekiq::CLI.instance.launcher.stopping?
10+
else
11+
false
12+
end
13+
end
14+
end
15+
end
16+
end
17+
end

lib/job-iteration/iteration.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ def around_iterate(&blk)
7575
set_callback(:iterate, :around, &blk)
7676
end
7777

78+
def interruption_adapter
79+
JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(queue_adapter_name)
80+
end
81+
7882
private
7983

8084
def ban_perform_definition
@@ -89,6 +93,7 @@ def initialize(*arguments)
8993
self.times_interrupted = 0
9094
self.total_time = 0.0
9195
assert_implements_methods!
96+
@interruption_adapter = self.class.interruption_adapter
9297
end
9398
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
9499

@@ -120,6 +125,9 @@ def retry_job(*, **)
120125

121126
private
122127

128+
# @api private
129+
attr_reader :interruption_adapter
130+
123131
def enumerator_builder
124132
JobIteration.enumerator_builder.new(self)
125133
end
@@ -280,7 +288,7 @@ def job_should_exit?
280288
max_job_runtime = job_iteration_max_job_runtime
281289
return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime
282290

283-
JobIteration.interruption_adapter.call || (defined?(super) && super)
291+
interruption_adapter.call || (defined?(super) && super)
284292
end
285293

286294
def job_iteration_max_job_runtime

test/test_helper.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
44
require "minitest/autorun"
55

6-
ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"
7-
86
require "job-iteration"
97
require "job-iteration/test_helper"
108

0 commit comments

Comments
 (0)