Skip to content

Commit d0f31e8

Browse files
committed
Restart without downtime
Add a new feature: Update/Reload without downtime. 1. The current supervisor receives a signal. 2. The current supervisor sends signals to its workers, and the workers stop all plugins that cannot run in parallel. 3. The current supervisor starts a new supervisor. * => Old processes and new processes run in parallel. 4. After the new supervisor and its workers start to work, the current supervisor and its workers stop. ref: nginx's feature for upgrading on the fly * http://nginx.org/en/docs/control.html#upgrade Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Specific feature: Run only limited Input plugins in parallel, such as `in_tcp`, `in_udp`, and `in_syslog`. Stop all plugins except those Input plugins, and prepare an agent for forwarding data to the new workers. After the new workers start, they receive events from the old workers. Note: need treasure-data/serverengine#146 Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 79d0293 commit d0f31e8

File tree

7 files changed

+163
-11
lines changed

7 files changed

+163
-11
lines changed

lib/fluent/engine.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def flush!
136136
@root_agent.flush!
137137
end
138138

139+
def shift_to_limited_mode!
140+
@root_agent.shift_to_limited_mode!
141+
end
142+
139143
def now
140144
# TODO thread update
141145
Fluent::EventTime.now

lib/fluent/plugin/in_tcp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def multi_workers_ready?
101101
true
102102
end
103103

104+
def limited_mode_ready?
105+
true
106+
end
107+
104108
def start
105109
super
106110

lib/fluent/plugin/in_udp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def multi_workers_ready?
6565
true
6666
end
6767

68+
def limited_mode_ready?
69+
true
70+
end
71+
6872
def start
6973
super
7074

lib/fluent/plugin/input.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ def metric_callback(es)
7070
def multi_workers_ready?
7171
false
7272
end
73+
74+
def limited_mode_ready?
75+
false
76+
end
77+
78+
def shift_to_limited_mode!
79+
raise "BUG: use shift_to_limited_mode although this plugin is not ready for the limited mode" unless limited_mode_ready?
80+
event_emitter_force_limited_router
81+
end
7382
end
7483
end
7584
end

lib/fluent/plugin_helper/event_emitter.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module EventEmitter
2626

2727
def router
2828
@_event_emitter_used_actually = true
29+
30+
return Engine.root_agent.limited_router if @_event_emitter_force_limited_router
31+
2932
if @_event_emitter_lazy_init
3033
@router = @primary_instance.router
3134
end
@@ -48,6 +51,10 @@ def event_emitter_used_actually?
4851
@_event_emitter_used_actually
4952
end
5053

54+
def event_emitter_force_limited_router
55+
@_event_emitter_force_limited_router = true
56+
end
57+
5158
def event_emitter_router(label_name)
5259
if label_name
5360
if label_name == "@ROOT"
@@ -72,6 +79,7 @@ def initialize
7279
super
7380
@_event_emitter_used_actually = false
7481
@_event_emitter_lazy_init = false
82+
@_event_emitter_force_limited_router = false
7583
@router = nil
7684
end
7785

lib/fluent/root_agent.rb

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,16 @@ def initialize(log:, system_config: SystemConfig.new)
6060
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
6161
@without_source = system_config.without_source unless system_config.without_source.nil?
6262
@enable_input_metrics = !!system_config.enable_input_metrics
63+
64+
@limited_mode_agent = nil
65+
@limited_router = nil
66+
@limited_mode_forwarding_port = "29140"
67+
@limited_mode_forwarding_buf_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, "limited_mode_buffer")
6368
end
6469

6570
attr_reader :inputs
6671
attr_reader :labels
72+
attr_reader :limited_router
6773

6874
def configure(conf)
6975
used_worker_ids = []
@@ -161,6 +167,9 @@ def configure(conf)
161167
add_source(type, e)
162168
}
163169
end
170+
171+
# TODO Stop doing this when it is not needed.
172+
add_source_to_receive_from_limited_mode_agent
164173
end
165174

166175
def setup_error_label(e)
@@ -192,10 +201,15 @@ def lifecycle(desc: false, kind_callback: nil)
192201
yield instance, display_kind
193202
end
194203
end
195-
if kind_callback
196-
kind_callback.call
197-
end
204+
205+
kind_callback&.call
206+
end
207+
208+
return unless @limited_mode_agent
209+
@limited_mode_agent.lifecycle do |plugin, display_kind|
210+
yield plugin, display_kind
198211
end
212+
kind_callback&.call
199213
end
200214

201215
def start
@@ -231,6 +245,84 @@ def flush!
231245
flushing_threads.each{|t| t.join }
232246
end
233247

248+
def shift_to_limited_mode!
249+
log.info "shifts to the limited mode"
250+
251+
limited_mode_agent = create_limited_mode_agent
252+
@limited_router = limited_mode_agent.event_router
253+
limited_mode_agent.lifecycle(desc: true) do |plugin|
254+
plugin.start unless plugin.started?
255+
plugin.after_start unless plugin.after_started?
256+
end
257+
258+
lifecycle_control_list[:input].select do |instance|
259+
instance.limited_mode_ready?
260+
end.each do |instance|
261+
instance.shift_to_limited_mode!
262+
end
263+
264+
SHUTDOWN_SEQUENCES.each do |sequence|
265+
if sequence.safe?
266+
lifecycle do |instance, kind|
267+
next if kind == :input and instance.limited_mode_ready?
268+
execute_shutdown_sequence(sequence, instance, kind)
269+
end
270+
next
271+
end
272+
273+
operation_threads = []
274+
callback = ->(){
275+
operation_threads.each { |t| t.join }
276+
operation_threads.clear
277+
}
278+
lifecycle(kind_callback: callback) do |instance, kind|
279+
next if kind == :input and instance.limited_mode_ready?
280+
t = Thread.new do
281+
Thread.current.abort_on_exception = true
282+
execute_shutdown_sequence(sequence, instance, kind)
283+
end
284+
operation_threads << t
285+
end
286+
end
287+
288+
@limited_mode_agent = limited_mode_agent
289+
end
290+
291+
def create_limited_mode_agent
292+
limited_mode_agent = Agent.new(log: log)
293+
limited_mode_agent.configure(
294+
Config::Element.new('LIMITED_MODE_OUTPUT', '', {}, [
295+
Config::Element.new('match', '**', {'@type' => 'forward'}, [
296+
Config::Element.new('server', '', {
297+
'host' => 'localhost',
298+
'port' => @limited_mode_forwarding_port,
299+
}, []),
300+
Config::Element.new('buffer', '', {
301+
'@type' => 'file',
302+
'path' => @limited_mode_forwarding_buf_path,
303+
'flush_at_shutdown' => 'true',
304+
'retry_type' => 'periodic',
305+
'retry_wait' => '10s',
306+
'retry_randomize' => 'false',
307+
}, []),
308+
])
309+
])
310+
)
311+
limited_mode_agent
312+
end
313+
314+
def add_source_to_receive_from_limited_mode_agent
315+
add_source(
316+
'forward',
317+
Config::Element.new('source', '', {
318+
'@type' => 'forward',
319+
'bind' => 'localhost',
320+
'port' => @limited_mode_forwarding_port,
321+
}, []
322+
),
323+
)
324+
end
325+
234326
class ShutdownSequence
235327
attr_reader :method, :checker
236328
def initialize(method, checker, is_safe)

lib/fluent/supervisor.rb

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def before_run
4343
@rpc_endpoint = nil
4444
@rpc_server = nil
4545
@counter = nil
46+
@socket_manager_server = nil
47+
@is_limited_mode = false
4648

4749
@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
4850
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
@@ -66,8 +68,12 @@ def before_run
6668
if config[:disable_shared_socket]
6769
$log.info "shared socket for multiple workers is disabled"
6870
else
69-
server = ServerEngine::SocketManager::Server.open
70-
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
71+
if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
72+
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
73+
else
74+
@socket_manager_server = ServerEngine::SocketManager::Server.open
75+
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
76+
end
7177
end
7278
end
7379

@@ -76,7 +82,7 @@ def after_run
7682
stop_rpc_server if @rpc_endpoint
7783
stop_counter_server if @counter
7884
cleanup_lock_dir
79-
Fluent::Supervisor.cleanup_resources
85+
Fluent::Supervisor.cleanup_resources unless @is_limited_mode
8086
end
8187

8288
def cleanup_lock_dir
@@ -138,7 +144,7 @@ def run_rpc_server
138144
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
139145
$log.debug "fluentd RPC got /api/config.gracefulReload request"
140146
if Fluent.windows?
141-
supervisor_sigusr2_handler
147+
graceful_reload
142148
else
143149
Process.kill :USR2, Process.pid
144150
end
@@ -187,7 +193,11 @@ def install_supervisor_signal_handlers
187193

188194
trap :USR2 do
189195
$log.debug 'fluentd supervisor process got SIGUSR2'
190-
supervisor_sigusr2_handler
196+
if Fluent.windows?
197+
graceful_reload
198+
else
199+
start_new_supervisor
200+
end
191201
end
192202
end
193203

@@ -254,7 +264,7 @@ def install_windows_event_handler
254264
when :usr1
255265
supervisor_sigusr1_handler
256266
when :usr2
257-
supervisor_sigusr2_handler
267+
graceful_reload
258268
when :cont
259269
supervisor_dump_handler_for_windows
260270
when :stop_event_thread
@@ -284,7 +294,7 @@ def supervisor_sigusr1_handler
284294
send_signal_to_workers(:USR1)
285295
end
286296

287-
def supervisor_sigusr2_handler
297+
def graceful_reload
288298
conf = nil
289299
t = Thread.new do
290300
$log.info 'Reloading new config'
@@ -312,6 +322,17 @@ def supervisor_sigusr2_handler
312322
$log.error "Failed to reload config file: #{e}"
313323
end
314324

325+
def start_new_supervisor
326+
send_signal_to_workers(:USR2)
327+
sleep 5 # TODO Wait until all workers finish shifting to the limited mode. How?
328+
@is_limited_mode = true
329+
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
330+
env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN}
331+
Process.spawn(env_to_add, commands.join(" "))
332+
rescue => e
333+
$log.error "Failed to start a new supervisor: #{e}"
334+
end
335+
315336
def supervisor_dump_handler_for_windows
316337
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
317338
# and it is implemented before the implementation of the function for Windows.
@@ -834,7 +855,7 @@ def install_main_process_signal_handlers
834855
end
835856

836857
trap :USR2 do
837-
reload_config
858+
shift_to_limited_mode
838859
end
839860

840861
trap :CONT do
@@ -893,6 +914,16 @@ def flush_buffer
893914
end
894915
end
895916

917+
def shift_to_limited_mode
918+
Thread.new do
919+
begin
920+
Fluent::Engine.shift_to_limited_mode!
921+
rescue Exception => e
922+
$log.warn "failed to shift to the limited mode: #{e}"
923+
end
924+
end
925+
end
926+
896927
def reload_config
897928
Thread.new do
898929
$log.debug('worker got SIGUSR2')

0 commit comments

Comments
 (0)