Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interval to restart worker #120

Merged
merged 12 commits into from
Jun 10, 2022
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ Available methods are different depending on `worker_type`. ServerEngine support
- **server_detach_wait** sets wait time before starting live restart (default: 10.0) [dynamic reloadable]
- Multithread server and multiprocess server: available only when `worker_type` is thread or process
- **workers** sets number of workers (default: 1) [dynamic reloadable]
- **start_worker_delay** sets wait time before starting a new worker (default: 0) [dynamic reloadable]
- **start_worker_delay** sets the delay between each worker-start when starting/restarting multiple workers at once (default: 0) [dynamic reloadable]
- **start_worker_delay_rand** randomizes start_worker_delay at this ratio (default: 0.2) [dynamic reloadable]
- **restart_worker_interval** sets wait time before restarting a stopped worker (default: 0) [dynamic reloadable]
- Multiprocess server: available only when `worker_type` is "process"
- **worker_process_name** changes process name ($0) of workers [dynamic reloadable]
- **worker_heartbeat_interval** sets interval of heartbeats in seconds (default: 1.0) [dynamic reloadable]
Expand Down
2 changes: 2 additions & 0 deletions lib/serverengine/multi_process_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ def initialize(worker, wid, pmon, reload_signal = Signals::RELOAD, unrecoverable
@unrecoverable_exit_codes = unrecoverable_exit_codes
@unrecoverable_exit = false
@exitstatus = nil
@restart_at = nil
end

attr_reader :exitstatus
attr_accessor :restart_at

def send_stop(stop_graceful)
@stop = true
Expand Down
3 changes: 3 additions & 0 deletions lib/serverengine/multi_thread_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class WorkerMonitor
def initialize(worker, thread)
@worker = worker
@thread = thread
@restart_at = nil
end

attr_accessor :restart_at

def send_stop(stop_graceful)
Thread.new do
begin
Expand Down
41 changes: 33 additions & 8 deletions lib/serverengine/multi_worker_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def reload

def run
while true
num_alive = keepalive_workers
break if num_alive == 0
num_alive_or_restarting = keepalive_workers
break if num_alive_or_restarting == 0
wait_tick
end
end
Expand Down Expand Up @@ -85,6 +85,7 @@ def reload_config

@start_worker_delay = @config[:start_worker_delay] || 0
@start_worker_delay_rand = @config[:start_worker_delay_rand] || 0.2
@restart_worker_interval = @config[:restart_worker_interval] || 0

scale_workers(@config[:workers] || 1)

Expand All @@ -96,12 +97,12 @@ def wait_tick
end

def keepalive_workers
num_alive = 0
num_alive_or_restarting = 0

@monitors.each_with_index do |m,wid|
if m && m.alive?
# alive
num_alive += 1
num_alive_or_restarting += 1

elsif m && m.respond_to?(:recoverable?) && !m.recoverable?
# exited, with unrecoverable exit code
Expand All @@ -116,8 +117,12 @@ def keepalive_workers
elsif wid < @num_workers
# scale up or reboot
unless @stop
@monitors[wid] = delayed_start_worker(wid)
num_alive += 1
if m
restart_worker(wid)
else
start_new_worker(wid)
end
num_alive_or_restarting += 1
end

elsif m
Expand All @@ -126,7 +131,27 @@ def keepalive_workers
end
end

return num_alive
return num_alive_or_restarting
end

def start_new_worker(wid)
delayed_start_worker(wid)
end

def restart_worker(wid)
m = @monitors[wid]

is_already_restarting = !m.restart_at.nil?
if is_already_restarting
delayed_start_worker(wid) if m.restart_at <= Time.now()
return
end

if @restart_worker_interval > 0
m.restart_at = Time.now() + @restart_worker_interval
else
delayed_start_worker(wid)
end
end

def delayed_start_worker(wid)
Expand All @@ -143,7 +168,7 @@ def delayed_start_worker(wid)
@last_start_worker_time = now
end

start_worker(wid)
@monitors[wid] = start_worker(wid)
end
end

Expand Down
1 change: 1 addition & 0 deletions serverengine.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.required_ruby_version = ">= 2.1.0"

gem.add_dependency "sigdump", ["~> 0.2.2"]
gem.add_dependency "timecop", ["~> 0.9.5"]
daipom marked this conversation as resolved.
Show resolved Hide resolved

# rake v12.x doesn't work with rspec 2. rspec should be updated to 3
gem.add_development_dependency "rake", ["~> 11.0"]
Expand Down
208 changes: 195 additions & 13 deletions spec/multi_spawn_server_spec.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,207 @@
require 'timeout'
require 'timecop'

describe ServerEngine::MultiSpawnServer do
include_context 'test server and worker'

context 'with command_sender=pipe' do
it 'starts worker processes' do
config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false}
describe 'starts worker processes' do
context 'with command_sender=pipe' do
it do
config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false}

s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup }
t = Thread.new { s.main }
s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup }
t = Thread.new { s.main }

begin
wait_for_fork
begin
wait_for_fork

Timeout.timeout(5) do
sleep(0.5) until test_state(:worker_run) == 2
Timeout.timeout(5) do
sleep(0.5) until test_state(:worker_run) == 2
end
test_state(:worker_run).should == 2
ensure
s.stop(true)
t.join
end
end
end
end

describe 'keepalive_workers' do
let(:config) {
{
workers: workers,
command_sender: 'pipe',
log_stdout: false,
log_stderr: false,
start_worker_delay: start_worker_delay,
start_worker_delay_rand: 0,
restart_worker_interval: restart_worker_interval,
}
}
let(:workers) { 3 }
let(:server) { ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup } }
let(:monitors) { server.instance_variable_get(:@monitors) }

context 'default' do
let(:start_worker_delay) { 0 }
let(:restart_worker_interval) { 0 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# On Windows, it takes some time to stop all multiple processes, so allow leeway to judge.
-> {
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m.alive? } == workers
end
}.should_not raise_error, "Not all workers restarted correctly."
ensure
server.stop(true)
t.join
end
end
end

context 'with only restart_worker_interval' do
let(:start_worker_delay) { 0 }
let(:restart_worker_interval) { 10 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Wait for initial starting
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# Wait for all workers to stop
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == 0
end

Timecop.freeze

mergin_time = 3

Timecop.freeze(Time.now + restart_worker_interval - mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should == 0

Timecop.freeze(Time.now + 2 * mergin_time)
# On Windows, it takes some time to stop all multiple processes, so allow leeway to judge.
-> {
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m.alive? } == workers
end
}.should_not raise_error, "Not all workers restarted correctly."
ensure
server.stop(true)
t.join
end
end
end

context 'with only start_worker_delay' do
let(:start_worker_delay) { 3 }
let(:restart_worker_interval) { 0 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Initial starts are delayed too, so set longer timeout.
# (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.)
Timeout.timeout(start_worker_delay * workers) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

# Skip time to avoid getting a delay for the initial starts.
Timecop.travel(Time.now + start_worker_delay)

monitors.each do |m|
m.send_stop(true)
end

sleep(3)

# The first worker should restart immediately.
monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers }

# `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.
sleep(start_worker_delay * workers)
monitors.count { |m| m.alive? }.should == workers
ensure
server.stop(true)
t.join
end
end
end

context 'with both options' do
let(:start_worker_delay) { 3 }
let(:restart_worker_interval) { 10 }

it do
t = Thread.new { server.main }

begin
wait_for_fork

# Initial starts are delayed too, so set longer timeout.
# (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.)
Timeout.timeout(start_worker_delay * workers) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == workers
end

monitors.each do |m|
m.send_stop(true)
end

# Wait for all workers to stop
Timeout.timeout(5) do
sleep(0.5) until monitors.count { |m| m && m.alive? } == 0
end

Timecop.freeze

mergin_time = 3

Timecop.freeze(Time.now + restart_worker_interval - mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should == 0

Timecop.travel(Time.now + 2 * mergin_time)
sleep(1.5)
monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers }

# `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.
sleep(start_worker_delay * workers)
monitors.count { |m| m.alive? }.should == workers
ensure
server.stop(true)
t.join
end
test_state(:worker_run).should == 2
ensure
s.stop(true)
t.join
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions spec/server_worker_context.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

require 'thread'
require 'yaml'
require 'timecop'

def reset_test_state
FileUtils.mkdir_p 'tmp'
Expand Down Expand Up @@ -165,8 +166,9 @@ def before_fork

def run
incr_test_state :worker_run
5.times do
# repeats 5 times because signal handlers
# This means this worker will automatically finish after 50 seconds.
10.times do
# repeats multiple times because signal handlers
# interrupts wait
@stop_flag.wait(5.0)
end
Expand Down Expand Up @@ -252,6 +254,7 @@ def stop

shared_context 'test server and worker' do
before { reset_test_state }
after { Timecop.return }

if ServerEngine.windows?
WAIT_RATIO = 2
Expand Down