Skip to content

Commit dce0177

Browse files
daipomWatson1978
andcommitted
Zero downtime restart
Add a new feature: Zero downtime update/reload 1. The supervisor receives SIGUSR2. 2. Spawn a new supervisor. 3. Take over shared sockets. 4. Launch new workers, and stop old processes in parallel. * Launch new workers with source-only mode * Limit to zero_downtime_restart_ready? input plugin * Send SIGTERM to the old supervisor after 10s delay from 3. 5. The old supervisor stops and sends SIGWINCH to the new one. 6. The new workers run fully. Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Note: need these feature * #4661 * treasure-data/serverengine#146 Co-authored-by: Shizuo Fujita <fujita@clear-code.com> Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 76a11ea commit dce0177

File tree

10 files changed

+447
-30
lines changed

10 files changed

+447
-30
lines changed

lib/fluent/engine.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def initialize
5151

5252
attr_reader :root_agent, :system_config, :supervisor_mode
5353

54-
def init(system_config, supervisor_mode: false)
54+
def init(system_config, supervisor_mode: false, start_in_parallel: false)
5555
@system_config = system_config
5656
@supervisor_mode = supervisor_mode
5757

@@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false)
6060

6161
@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?
6262

63-
@root_agent = RootAgent.new(log: log, system_config: @system_config)
63+
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)
6464

6565
self
6666
end

lib/fluent/plugin/in_syslog.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ def multi_workers_ready?
156156
true
157157
end
158158

159+
def zero_downtime_restart_ready?
160+
true
161+
end
162+
159163
def start
160164
super
161165

lib/fluent/plugin/in_tcp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def multi_workers_ready?
101101
true
102102
end
103103

104+
def zero_downtime_restart_ready?
105+
true
106+
end
107+
104108
def start
105109
super
106110

lib/fluent/plugin/in_udp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def multi_workers_ready?
6565
true
6666
end
6767

68+
def zero_downtime_restart_ready?
69+
true
70+
end
71+
6872
def start
6973
super
7074

lib/fluent/plugin/input.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ def metric_callback(es)
7070
def multi_workers_ready?
7171
false
7272
end
73+
74+
def zero_downtime_restart_ready?
75+
false
76+
end
7377
end
7478
end
7579
end

lib/fluent/root_agent.rb

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,43 @@ module Fluent
4848
class RootAgent < Agent
4949
ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label
5050

51-
def initialize(log:, system_config: SystemConfig.new)
51+
class SourceOnlyMode
52+
DISABELD = 0
53+
NORMAL = 1
54+
ONLY_ZERO_DOWNTIME_RESTART_READY = 2
55+
56+
def initialize(with_source_only, start_in_parallel)
57+
if start_in_parallel
58+
@mode = ONLY_ZERO_DOWNTIME_RESTART_READY
59+
elsif with_source_only
60+
@mode = NORMAL
61+
else
62+
@mode = DISABELD
63+
end
64+
end
65+
66+
def enabled?
67+
@mode != DISABELD
68+
end
69+
70+
def only_zero_downtime_restart_ready?
71+
@mode == ONLY_ZERO_DOWNTIME_RESTART_READY
72+
end
73+
74+
def disable!
75+
@mode = DISABELD
76+
end
77+
end
78+
79+
def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
5280
super(log: log)
5381

5482
@labels = {}
5583
@inputs = []
5684
@suppress_emit_error_log_interval = 0
5785
@next_emit_error_log_time = nil
5886
@without_source = system_config.without_source || false
59-
@with_source_only = system_config.with_source_only || false
87+
@source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
6088
@source_only_buffer_agent = nil
6189
@enable_input_metrics = system_config.enable_input_metrics || false
6290

@@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new)
6795
attr_reader :labels
6896

6997
def source_only_router
70-
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
98+
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled?
7199
@source_only_buffer_agent.event_router
72100
end
73101

@@ -154,7 +182,7 @@ def configure(conf)
154182

155183
super
156184

157-
setup_source_only_buffer_agent if @with_source_only
185+
setup_source_only_buffer_agent if @source_only_mode.enabled?
158186

159187
# initialize <source> elements
160188
if @without_source
@@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent
187215
end
188216

189217
def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
218+
only_zero_downtime_restart_ready = false
219+
190220
unless kind_or_agent_list
191-
if @with_source_only
221+
if @source_only_mode.enabled?
192222
kind_or_agent_list = [:input, @source_only_buffer_agent]
223+
only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready?
193224
elsif @source_only_buffer_agent
194225
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
195226
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
@@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
214245
end
215246
display_kind = (kind == :output_with_router ? :output : kind)
216247
list.each do |instance|
248+
if only_zero_downtime_restart_ready
249+
next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready?
250+
end
217251
yield instance, display_kind
218252
end
219253
end
@@ -257,7 +291,7 @@ def flush!
257291
end
258292

259293
def cancel_source_only!
260-
unless @with_source_only
294+
unless @source_only_mode.enabled?
261295
log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
262296
return
263297
end
@@ -285,7 +319,7 @@ def cancel_source_only!
285319
setup_source_only_buffer_agent(flush: true)
286320
start(kind_or_agent_list: [@source_only_buffer_agent])
287321

288-
@with_source_only = false
322+
@source_only_mode.disable!
289323
end
290324

291325
def shutdown(kind_or_agent_list: nil)
@@ -378,7 +412,7 @@ def add_source(type, conf)
378412
# See also 'fluentd/plugin/input.rb'
379413
input.context_router = @event_router
380414
input.configure(conf)
381-
input.event_emitter_apply_source_only if @with_source_only
415+
input.event_emitter_apply_source_only if @source_only_mode.enabled?
382416
if @enable_input_metrics
383417
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
384418
end

0 commit comments

Comments
 (0)