Skip to content

Commit

Permalink
Revert "Merge pull request #35 from abicky/reimplement-efficient-poll…
Browse files Browse the repository at this point in the history
…ing-thread"

This reverts commit 1528542, reversing
changes made to e69e027.
  • Loading branch information
joker1007 committed Aug 20, 2022
1 parent 573560f commit 84dbefb
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 18 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,10 @@ And other options is following.

```
$ crono_trigger --help
Usage: crono_trigger [options] [MODEL..]
If MODEL is not given, Search classes including CronoTrigger::Schedulable module automatically.
-w, --worker-id=ID Worker ID (default: First local ip address which is not loopback
Usage: crono_trigger [options] MODEL [MODEL..]
-f, --config-file=CONFIG Config file (ex. ./crono_trigger.rb)
-e, --environment=ENV Set environment name (ex. development, production)
-p, --polling-thread=SIZE Polling thread size (Default: Min of (target model count or processor_count)
-p, --polling-thread=SIZE Polling thread size (Default: 1)
-i, --polling-interval=SECOND Polling interval seconds (Default: 5)
-c, --concurrency=SIZE Execute thread size (Default: 25)
-l, --log=LOGFILE Set log output destination (Default: STDOUT or ./crono_trigger.log if daemonize is true)
Expand Down
10 changes: 4 additions & 6 deletions lib/crono_trigger/polling_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ def initialize(model_queue, stop_flag, logger, executor, execution_counter)
@stop_flag = stop_flag
@logger = logger
@executor = executor
if @executor.fallback_policy != :caller_runs
raise ArgumentError, "executor's fallback policies except for :caller_runs are not supported"
end
@execution_counter = execution_counter
@quiet = Concurrent::AtomicBoolean.new(false)
end
Expand Down Expand Up @@ -52,15 +55,12 @@ def alive?
def poll(model)
@logger.info "(polling-thread-#{Thread.current.object_id}) Poll #{model}"

queue_empty_event = Concurrent::Event.new
maybe_has_next = true
while maybe_has_next && !@stop_flag.set?
queue_empty_event.wait unless @executor.queue_length.zero?
records, maybe_has_next = model.connection_pool.with_connection do
model.executables_with_lock(limit: @executor.remaining_capacity)
model.executables_with_lock
end

queue_empty_event.reset
records.each do |record|
@executor.post do
@execution_counter.increment
Expand All @@ -69,8 +69,6 @@ def poll(model)
ensure
@execution_counter.decrement
end

queue_empty_event.set if @executor.queue_length.zero?
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/crono_trigger/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def initialize
min_threads: 1,
max_threads: CronoTrigger.config.executor_thread,
max_queue: CronoTrigger.config.executor_thread * 2,
fallback_policy: :caller_runs,
)
@execution_counter = Concurrent::AtomicFixnum.new
@logger = Logger.new(STDOUT) unless @logger
Expand Down
9 changes: 3 additions & 6 deletions spec/crono_trigger/polling_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@

let(:immediate_executor_class_with_fallabck_policy) do
Class.new(Concurrent::ImmediateExecutor) do
def queue_length
0
end

def remaining_capacity
CronoTrigger.config.executor_thread * 2
def initialize(*args, **kwargs)
super
@fallback_policy = :caller_runs
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize

before do
allow_any_instance_of(Notification).to receive(:execute) do
sleep 0.5
sleep 1
end

now = Time.now
Expand All @@ -37,7 +37,7 @@ def initialize
it "processes all the records without returning from #poll" do
worker = worker_class.new
Thread.start { worker.run }
sleep CronoTrigger.config.polling_interval + 3
sleep CronoTrigger.config.polling_interval + 2

expect(Notification.executables).not_to be_exists
ensure
Expand Down

0 comments on commit 84dbefb

Please sign in to comment.