Skip to content

Commit

Permalink
Merge pull request #120 from daipom/add-interval-restarting-worker
Browse files Browse the repository at this point in the history
Add interval to restart worker
  • Loading branch information
ashie authored Jun 10, 2022
2 parents 3c9da40 + 0c4a9e9 commit dcc591a
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 24 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ Available methods are different depending on `worker_type`. ServerEngine support
- **server_detach_wait** sets wait time before starting live restart (default: 10.0) [dynamic reloadable]
- Multithread server and multiprocess server: available only when `worker_type` is thread or process
- **workers** sets number of workers (default: 1) [dynamic reloadable]
- **start_worker_delay** sets wait time before starting a new worker (default: 0) [dynamic reloadable]
- **start_worker_delay** sets the delay between each worker-start when starting/restarting multiple workers at once (default: 0) [dynamic reloadable]
- **start_worker_delay_rand** randomizes start_worker_delay at this ratio (default: 0.2) [dynamic reloadable]
- **restart_worker_interval** sets wait time before restarting a stopped worker (default: 0) [dynamic reloadable]
- Multiprocess server: available only when `worker_type` is "process"
- **worker_process_name** changes process name ($0) of workers [dynamic reloadable]
- **worker_heartbeat_interval** sets interval of heartbeats in seconds (default: 1.0) [dynamic reloadable]
Expand Down
2 changes: 2 additions & 0 deletions lib/serverengine/multi_process_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ def initialize(worker, wid, pmon, reload_signal = Signals::RELOAD, unrecoverable
@unrecoverable_exit_codes = unrecoverable_exit_codes
@unrecoverable_exit = false
@exitstatus = nil
@restart_at = nil
end

attr_reader :exitstatus
attr_accessor :restart_at

def send_stop(stop_graceful)
@stop = true
Expand Down
3 changes: 3 additions & 0 deletions lib/serverengine/multi_thread_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class WorkerMonitor
def initialize(worker, thread)
@worker = worker
@thread = thread
@restart_at = nil
end

attr_accessor :restart_at

def send_stop(stop_graceful)
Thread.new do
begin
Expand Down
41 changes: 33 additions & 8 deletions lib/serverengine/multi_worker_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def reload

def run
while true
num_alive = keepalive_workers
break if num_alive == 0
num_alive_or_restarting = keepalive_workers
break if num_alive_or_restarting == 0
wait_tick
end
end
Expand Down Expand Up @@ -85,6 +85,7 @@ def reload_config

@start_worker_delay = @config[:start_worker_delay] || 0
@start_worker_delay_rand = @config[:start_worker_delay_rand] || 0.2
@restart_worker_interval = @config[:restart_worker_interval] || 0

scale_workers(@config[:workers] || 1)

Expand All @@ -96,12 +97,12 @@ def wait_tick
end

def keepalive_workers
num_alive = 0
num_alive_or_restarting = 0

@monitors.each_with_index do |m,wid|
if m && m.alive?
# alive
num_alive += 1
num_alive_or_restarting += 1

elsif m && m.respond_to?(:recoverable?) && !m.recoverable?
# exited, with unrecoverable exit code
Expand All @@ -116,8 +117,12 @@ def keepalive_workers
elsif wid < @num_workers
# scale up or reboot
unless @stop
@monitors[wid] = delayed_start_worker(wid)
num_alive += 1
if m
restart_worker(wid)
else
start_new_worker(wid)
end
num_alive_or_restarting += 1
end

elsif m
Expand All @@ -126,7 +131,27 @@ def keepalive_workers
end
end

return num_alive
return num_alive_or_restarting
end

def start_new_worker(wid)
delayed_start_worker(wid)
end

def restart_worker(wid)
m = @monitors[wid]

is_already_restarting = !m.restart_at.nil?
if is_already_restarting
delayed_start_worker(wid) if m.restart_at <= Time.now()
return
end

if @restart_worker_interval > 0
m.restart_at = Time.now() + @restart_worker_interval
else
delayed_start_worker(wid)
end
end

def delayed_start_worker(wid)
Expand All @@ -143,7 +168,7 @@ def delayed_start_worker(wid)
@last_start_worker_time = now
end

start_worker(wid)
@monitors[wid] = start_worker(wid)
end
end

Expand Down
2 changes: 2 additions & 0 deletions serverengine.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Gem::Specification.new do |gem|
gem.add_development_dependency 'rake-compiler-dock', ['~> 0.5.0']
gem.add_development_dependency 'rake-compiler', ['~> 0.9.4']

gem.add_development_dependency "timecop", ["~> 0.9.5"]

# build gem for a certain platform. see also Rakefile
fake_platform = ENV['GEM_BUILD_FAKE_PLATFORM'].to_s
gem.platform = fake_platform unless fake_platform.empty?
Expand Down
208 changes: 195 additions & 13 deletions spec/multi_spawn_server_spec.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,207 @@
require 'timeout'
require 'timecop'

describe ServerEngine::MultiSpawnServer do
include_context 'test server and worker'

context 'with command_sender=pipe' do
it 'starts worker processes' do
config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false}
describe 'starts worker processes' do
context 'with command_sender=pipe' do
it do
config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false}

s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup }
t = Thread.new { s.main }
s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup }
t = Thread.new { s.main }

begin
wait_for_fork
begin
wait_for_fork

Timeout.timeout(5) do
sleep(0.5) until test_state(:worker_run) == 2
Timeout.timeout(5) do
sleep(0.5) until test_state(:worker_run) == 2
end
test_state(:worker_run).should == 2
ensure
s.stop(true)
t.join
end
end
end
end

describe 'keepalive_workers' do
let(:config) {
{
workers: workers,
command_sender: 'pipe',
log_stdout: false,
log_stderr: false,
start_worker_delay: start_worker_delay,
start_worker_delay_rand: 0,
restart_worker_interval: restart_worker_interval,
}
}
let(:workers) { 3 }
let(:server) { ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup } }
let(:monitors) { server.instance_variable_get(:@monitors) }

context 'default' do
let(:start_worker_delay) { 0 }
let(:restart_worker_interval) { 0 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# On Windows, it takes some time to stop all multiple processes, so allow leeway to judge.
-> {
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m.alive? } == workers
end
}.should_not raise_error, "Not all workers restarted correctly."
ensure
server.stop(true)
t.join
end
end
end

context 'with only restart_worker_interval' do
let(:start_worker_delay) { 0 }
let(:restart_worker_interval) { 10 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Wait for initial starting
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# Wait for all workers to stop
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == 0
end

Timecop.freeze

mergin_time = 3

Timecop.freeze(Time.now + restart_worker_interval - mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should == 0

Timecop.freeze(Time.now + 2 * mergin_time)
# On Windows, it takes some time to stop all multiple processes, so allow leeway to judge.
-> {
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m.alive? } == workers
end
}.should_not raise_error, "Not all workers restarted correctly."
ensure
server.stop(true)
t.join
end
end
end

context 'with only start_worker_delay' do
let(:start_worker_delay) { 3 }
let(:restart_worker_interval) { 0 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Initial starts are delayed too, so set longer timeout.
# (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.)
Timeout.timeout(start_worker_delay * workers) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

# Skip time to avoid getting a delay for the initial starts.
Timecop.travel(Time.now + start_worker_delay)

monitors.each do |m|
m.send_stop(true)
end

sleep(3)

# The first worker should restart immediately.
monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers }

# `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.
sleep(start_worker_delay * workers)
monitors.count { |m| m.alive? }.should == workers
ensure
server.stop(true)
t.join
end
end
end

context 'with both options' do
let(:start_worker_delay) { 3 }
let(:restart_worker_interval) { 10 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Initial starts are delayed too, so set longer timeout.
# (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.)
Timeout.timeout(start_worker_delay * workers) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# Wait for all workers to stop
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == 0
end

Timecop.freeze

mergin_time = 3

Timecop.freeze(Time.now + restart_worker_interval - mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should == 0

Timecop.travel(Time.now + 2 * mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers }

# `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.
sleep(start_worker_delay * workers)
monitors.count { |m| m.alive? }.should == workers
ensure
server.stop(true)
t.join
end
test_state(:worker_run).should == 2
ensure
s.stop(true)
t.join
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions spec/server_worker_context.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

require 'thread'
require 'yaml'
require 'timecop'

def reset_test_state
FileUtils.mkdir_p 'tmp'
Expand Down Expand Up @@ -165,8 +166,9 @@ def before_fork

def run
incr_test_state :worker_run
5.times do
# repeats 5 times because signal handlers
# This means this worker will automatically finish after 50 seconds.
10.times do
# repeats multiple times because signal handlers
# interrupts wait
@stop_flag.wait(5.0)
end
Expand Down Expand Up @@ -252,6 +254,7 @@ def stop

shared_context 'test server and worker' do
before { reset_test_state }
after { Timecop.return }

if ServerEngine.windows?
WAIT_RATIO = 2
Expand Down

0 comments on commit dcc591a

Please sign in to comment.