diff --git a/README.md b/README.md index 758fb29..e6fdc7a 100644 --- a/README.md +++ b/README.md @@ -480,8 +480,9 @@ Available methods are different depending on `worker_type`. ServerEngine support - **server_detach_wait** sets wait time before starting live restart (default: 10.0) [dynamic reloadable] - Multithread server and multiprocess server: available only when `worker_type` is thread or process - **workers** sets number of workers (default: 1) [dynamic reloadable] - - **start_worker_delay** sets wait time before starting a new worker (default: 0) [dynamic reloadable] + - **start_worker_delay** sets the delay between each worker-start when starting/restarting multiple workers at once (default: 0) [dynamic reloadable] - **start_worker_delay_rand** randomizes start_worker_delay at this ratio (default: 0.2) [dynamic reloadable] + - **restart_worker_interval** sets wait time before restarting a stopped worker (default: 0) [dynamic reloadable] - Multiprocess server: available only when `worker_type` is "process" - **worker_process_name** changes process name ($0) of workers [dynamic reloadable] - **worker_heartbeat_interval** sets interval of heartbeats in seconds (default: 1.0) [dynamic reloadable] diff --git a/lib/serverengine/multi_process_server.rb b/lib/serverengine/multi_process_server.rb index 19cd72a..41761b4 100644 --- a/lib/serverengine/multi_process_server.rb +++ b/lib/serverengine/multi_process_server.rb @@ -105,9 +105,11 @@ def initialize(worker, wid, pmon, reload_signal = Signals::RELOAD, unrecoverable @unrecoverable_exit_codes = unrecoverable_exit_codes @unrecoverable_exit = false @exitstatus = nil + @restart_at = nil end attr_reader :exitstatus + attr_accessor :restart_at def send_stop(stop_graceful) @stop = true diff --git a/lib/serverengine/multi_thread_server.rb b/lib/serverengine/multi_thread_server.rb index 0b3e2d1..6615937 100644 --- a/lib/serverengine/multi_thread_server.rb +++ b/lib/serverengine/multi_thread_server.rb @@ -39,8 +39,11 @@ class WorkerMonitor def initialize(worker, thread) @worker = worker @thread = thread + @restart_at = nil end + attr_accessor :restart_at + def send_stop(stop_graceful) Thread.new do begin diff --git a/lib/serverengine/multi_worker_server.rb b/lib/serverengine/multi_worker_server.rb index 60b650f..1fef343 100644 --- a/lib/serverengine/multi_worker_server.rb +++ b/lib/serverengine/multi_worker_server.rb @@ -55,8 +55,8 @@ def reload def run while true - num_alive = keepalive_workers - break if num_alive == 0 + num_alive_or_restarting = keepalive_workers + break if num_alive_or_restarting == 0 wait_tick end end @@ -85,6 +85,7 @@ def reload_config @start_worker_delay = @config[:start_worker_delay] || 0 @start_worker_delay_rand = @config[:start_worker_delay_rand] || 0.2 + @restart_worker_interval = @config[:restart_worker_interval] || 0 scale_workers(@config[:workers] || 1) @@ -96,12 +97,12 @@ def wait_tick end def keepalive_workers - num_alive = 0 + num_alive_or_restarting = 0 @monitors.each_with_index do |m,wid| if m && m.alive? # alive - num_alive += 1 + num_alive_or_restarting += 1 elsif m && m.respond_to?(:recoverable?) && !m.recoverable? # exited, with unrecoverable exit code @@ -116,8 +117,12 @@ def keepalive_workers elsif wid < @num_workers # scale up or reboot unless @stop - @monitors[wid] = delayed_start_worker(wid) - num_alive += 1 + if m + restart_worker(wid) + else + start_new_worker(wid) + end + num_alive_or_restarting += 1 end elsif m @@ -126,7 +131,27 @@ def keepalive_workers end end - return num_alive + return num_alive_or_restarting + end + + def start_new_worker(wid) + delayed_start_worker(wid) + end + + def restart_worker(wid) + m = @monitors[wid] + + is_already_restarting = !m.restart_at.nil? + if is_already_restarting + delayed_start_worker(wid) if m.restart_at <= Time.now() + return + end + + if @restart_worker_interval > 0 + m.restart_at = Time.now() + @restart_worker_interval + else + delayed_start_worker(wid) + end end def delayed_start_worker(wid) @@ -143,7 +168,7 @@ def delayed_start_worker(wid) @last_start_worker_time = now end - start_worker(wid) + @monitors[wid] = start_worker(wid) end end diff --git a/serverengine.gemspec b/serverengine.gemspec index 91b7f0b..955d9c1 100644 --- a/serverengine.gemspec +++ b/serverengine.gemspec @@ -27,6 +27,8 @@ Gem::Specification.new do |gem| gem.add_development_dependency 'rake-compiler-dock', ['~> 0.5.0'] gem.add_development_dependency 'rake-compiler', ['~> 0.9.4'] + gem.add_development_dependency "timecop", ["~> 0.9.5"] + # build gem for a certain platform. see also Rakefile fake_platform = ENV['GEM_BUILD_FAKE_PLATFORM'].to_s gem.platform = fake_platform unless fake_platform.empty? diff --git a/spec/multi_spawn_server_spec.rb b/spec/multi_spawn_server_spec.rb index 194e234..13441d1 100644 --- a/spec/multi_spawn_server_spec.rb +++ b/spec/multi_spawn_server_spec.rb @@ -1,25 +1,207 @@ require 'timeout' +require 'timecop' describe ServerEngine::MultiSpawnServer do include_context 'test server and worker' - context 'with command_sender=pipe' do - it 'starts worker processes' do - config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false} + describe 'starts worker processes' do + context 'with command_sender=pipe' do + it do + config = {workers: 2, command_sender: 'pipe', log_stdout: false, log_stderr: false} - s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup } - t = Thread.new { s.main } + s = ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup } + t = Thread.new { s.main } - begin - wait_for_fork + begin + wait_for_fork - Timeout.timeout(5) do - sleep(0.5) until test_state(:worker_run) == 2 + Timeout.timeout(5) do + sleep(0.5) until test_state(:worker_run) == 2 + end + test_state(:worker_run).should == 2 + ensure + s.stop(true) + t.join + end + end + end + end + + describe 'keepalive_workers' do + let(:config) { + { + workers: workers, + command_sender: 'pipe', + log_stdout: false, + log_stderr: false, + start_worker_delay: start_worker_delay, + start_worker_delay_rand: 0, + restart_worker_interval: restart_worker_interval, + } + } + let(:workers) { 3 } + let(:server) { ServerEngine::MultiSpawnServer.new(TestWorker) { config.dup } } + let(:monitors) { server.instance_variable_get(:@monitors) } + + context 'default' do + let(:start_worker_delay) { 0 } + let(:restart_worker_interval) { 0 } + + it do + t = Thread.new { server.main } + + begin + wait_for_fork + + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == workers + end + + monitors.each do |m| + m.send_stop(true) + end + + # On Windows, it takes some time to stop all multiple processes, so allow leeway to judge. + -> { + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m.alive? } == workers + end + }.should_not raise_error, "Not all workers restarted correctly." + ensure + server.stop(true) + t.join + end + end + end + + context 'with only restart_worker_interval' do + let(:start_worker_delay) { 0 } + let(:restart_worker_interval) { 10 } + + it do + t = Thread.new { server.main } + + begin + wait_for_fork + + # Wait for initial starting + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == workers + end + + monitors.each do |m| + m.send_stop(true) + end + + # Wait for all workers to stop + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == 0 + end + + Timecop.freeze + + mergin_time = 3 + + Timecop.freeze(Time.now + restart_worker_interval - mergin_time) + sleep(1.5) + monitors.count { |m| m.alive? }.should == 0 + + Timecop.freeze(Time.now + 2 * mergin_time) + # On Windows, it takes some time to stop all multiple processes, so allow leeway to judge. + -> { + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m.alive? } == workers + end + }.should_not raise_error, "Not all workers restarted correctly." + ensure + server.stop(true) + t.join + end + end + end + + context 'with only start_worker_delay' do + let(:start_worker_delay) { 3 } + let(:restart_worker_interval) { 0 } + + it do + t = Thread.new { server.main } + + begin + wait_for_fork + + # Initial starts are delayed too, so set longer timeout. + # (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.) + Timeout.timeout(start_worker_delay * workers) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == workers + end + + # Skip time to avoid getting a delay for the initial starts. + Timecop.travel(Time.now + start_worker_delay) + + monitors.each do |m| + m.send_stop(true) + end + + sleep(3) + + # The first worker should restart immediately. + monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers } + + # `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait. + sleep(start_worker_delay * workers) + monitors.count { |m| m.alive? }.should == workers + ensure + server.stop(true) + t.join + end + end + end + + context 'with both options' do + let(:start_worker_delay) { 3 } + let(:restart_worker_interval) { 10 } + + it do + t = Thread.new { server.main } + + begin + wait_for_fork + + # Initial starts are delayed too, so set longer timeout. + # (`start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait.) + Timeout.timeout(start_worker_delay * workers) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == workers + end + + monitors.each do |m| + m.send_stop(true) + end + + # Wait for all workers to stop + Timeout.timeout(5) do + sleep(0.5) until monitors.count { |m| m && m.alive? } == 0 + end + + Timecop.freeze + + mergin_time = 3 + + Timecop.freeze(Time.now + restart_worker_interval - mergin_time) + sleep(1.5) + monitors.count { |m| m.alive? }.should == 0 + + Timecop.travel(Time.now + 2 * mergin_time) + sleep(1.5) + monitors.count { |m| m.alive? }.should satisfy { |c| 0 < c && c < workers } + + # `start_worker_delay` uses `sleep` inside, so Timecop can't skip this wait. + sleep(start_worker_delay * workers) + monitors.count { |m| m.alive? }.should == workers + ensure + server.stop(true) + t.join end - test_state(:worker_run).should == 2 - ensure - s.stop(true) - t.join end end end diff --git a/spec/server_worker_context.rb b/spec/server_worker_context.rb index 7ed2062..21bd71b 100644 --- a/spec/server_worker_context.rb +++ b/spec/server_worker_context.rb @@ -1,6 +1,7 @@ require 'thread' require 'yaml' +require 'timecop' def reset_test_state FileUtils.mkdir_p 'tmp' @@ -165,8 +166,9 @@ def before_fork def run incr_test_state :worker_run - 5.times do - # repeats 5 times because signal handlers + # This means this worker will automatically finish after 50 seconds. + 10.times do + # repeats multiple times because signal handlers # interrupts wait @stop_flag.wait(5.0) end @@ -252,6 +254,7 @@ def stop shared_context 'test server and worker' do before { reset_test_state } + after { Timecop.return } if ServerEngine.windows? WAIT_RATIO = 2