@@ -43,6 +43,10 @@ def before_run
43
43
@rpc_endpoint = nil
44
44
@rpc_server = nil
45
45
@counter = nil
46
+ @socket_manager_server = nil
47
+ @starting_new_supervisor_without_downtime = false
48
+ @new_supervisor_pid = nil
49
+ start_in_parallel = ENV . key? ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" )
46
50
47
51
@fluentd_lock_dir = Dir . mktmpdir ( "fluentd-lock-" )
48
52
ENV [ 'FLUENTD_LOCK_DIR' ] = @fluentd_lock_dir
@@ -65,18 +69,31 @@ def before_run
65
69
66
70
if config [ :disable_shared_socket ]
67
71
$log. info "shared socket for multiple workers is disabled"
72
+ elsif start_in_parallel
73
+ begin
74
+ raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV . key? ( 'SERVERENGINE_SOCKETMANAGER_PATH' )
75
+ @socket_manager_server = ServerEngine ::SocketManager ::Server . take_over_another_server ( ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ] )
76
+ $log. info "restart-without-downtime: took over the shared sockets" , path : ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ]
77
+ rescue => e
78
+ $log. error "restart-without-downtime: cancel sequence because failed to take over the shared sockets" , error : e
79
+ raise
80
+ end
68
81
else
69
- server = ServerEngine ::SocketManager ::Server . open
70
- ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ] = server . path . to_s
82
+ @socket_manager_server = ServerEngine ::SocketManager ::Server . open
83
+ ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ] = @socket_manager_server . path . to_s
71
84
end
85
+
86
+ stop_parallel_old_supervisor_after_delay if start_in_parallel
72
87
end
73
88
74
89
def after_run
75
90
stop_windows_event_thread if Fluent . windows?
76
91
stop_rpc_server if @rpc_endpoint
77
92
stop_counter_server if @counter
78
93
cleanup_lock_dir
79
- Fluent ::Supervisor . cleanup_resources
94
+ Fluent ::Supervisor . cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime
95
+
96
+ notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime
80
97
end
81
98
82
99
def cleanup_lock_dir
@@ -138,7 +155,7 @@ def run_rpc_server
138
155
@rpc_server . mount_proc ( '/api/config.gracefulReload' ) { |req , res |
139
156
$log. debug "fluentd RPC got /api/config.gracefulReload request"
140
157
if Fluent . windows?
141
- supervisor_sigusr2_handler
158
+ graceful_reload
142
159
else
143
160
Process . kill :USR2 , Process . pid
144
161
end
@@ -172,6 +189,47 @@ def stop_counter_server
172
189
@counter . stop
173
190
end
174
191
192
+ def stop_parallel_old_supervisor_after_delay
193
+ # TODO if the new supervisor fails to start and this is not called,
194
+ # it would be necessary to update the pid in the PID file to the old one when daemonized.
195
+
196
+ Thread . new do
197
+ # Delay to avoid worker downtime as much as possible.
198
+ # Even if the downtime occurs, it is no problem because the socket buffer works,
199
+ # as long as the capacity is not exceeded.
200
+ sleep 10
201
+ old_pid = ENV [ "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" ] &.to_i
202
+ if old_pid
203
+ $log. info "restart-without-downtime: stop the old supervisor"
204
+ Process . kill :TERM , old_pid
205
+ end
206
+ rescue => e
207
+ $log. warn "restart-without-downtime: failed to stop the old supervisor." +
208
+ " If the old one does not exist, please send '34' signal to this new process to start to work fully." +
209
+ " If it exists, something went wrong. Please kill the old one manually." ,
210
+ error : e
211
+ end
212
+ end
213
+
214
+ def notify_new_supervisor_that_old_one_has_stopped
215
+ if config [ :pid_path ]
216
+ new_pid = File . read ( config [ :pid_path ] ) . to_i
217
+ else
218
+ raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid
219
+ new_pid = @new_supervisor_pid
220
+ end
221
+
222
+ $log. info "restart-without-downtime: notify the new supervisor (pid: #{ new_pid } ) that old one has stopped"
223
+ Process . kill 34 , new_pid
224
+ rescue => e
225
+ $log. error (
226
+ "restart-without-downtime: failed to notify the new supervisor." +
227
+ " Please send '34' signal to the new supervisor process manually" +
228
+ " if it does not start to work fully." ,
229
+ error : e
230
+ )
231
+ end
232
+
175
233
def install_supervisor_signal_handlers
176
234
return if Fluent . windows?
177
235
@@ -187,7 +245,11 @@ def install_supervisor_signal_handlers
187
245
188
246
trap :USR2 do
189
247
$log. debug 'fluentd supervisor process got SIGUSR2'
190
- supervisor_sigusr2_handler
248
+ if Fluent . windows?
249
+ graceful_reload
250
+ else
251
+ restart_without_downtime
252
+ end
191
253
end
192
254
193
255
trap 34 do
@@ -259,7 +321,7 @@ def install_windows_event_handler
259
321
when :usr1
260
322
supervisor_sigusr1_handler
261
323
when :usr2
262
- supervisor_sigusr2_handler
324
+ graceful_reload
263
325
when :cont
264
326
supervisor_dump_handler_for_windows
265
327
when :stop_event_thread
@@ -289,7 +351,7 @@ def supervisor_sigusr1_handler
289
351
send_signal_to_workers ( :USR1 )
290
352
end
291
353
292
- def supervisor_sigusr2_handler
354
+ def graceful_reload
293
355
conf = nil
294
356
t = Thread . new do
295
357
$log. info 'Reloading new config'
@@ -317,7 +379,38 @@ def supervisor_sigusr2_handler
317
379
$log. error "Failed to reload config file: #{ e } "
318
380
end
319
381
382
+ def restart_without_downtime
383
+ # TODO exclusive lock
384
+
385
+ $log. info "start restart-without-downtime sequence"
386
+
387
+ if @starting_new_supervisor_without_downtime
388
+ $log. warn "restart-without-downtime: canceled because it is already starting"
389
+ return
390
+ end
391
+ if ENV . key? ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" )
392
+ $log. warn "restart-without-downtime: canceled because the previous sequence is still running"
393
+ return
394
+ end
395
+
396
+ @starting_new_supervisor_without_downtime = true
397
+ commands = [ ServerEngine . ruby_bin_path , $0] + ARGV
398
+ env_to_add = {
399
+ "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine ::SocketManager ::INTERNAL_TOKEN ,
400
+ "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{ Process . pid } " ,
401
+ }
402
+ pid = Process . spawn ( env_to_add , commands . join ( " " ) )
403
+ @new_supervisor_pid = pid unless config [ :daemonize ]
404
+ rescue => e
405
+ $log. error "restart-without-downtime: failed" , error : e
406
+ @starting_new_supervisor_without_downtime = false
407
+ end
408
+
320
409
def cancel_source_only
410
+ if ENV . key? ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" )
411
+ $log. info "restart-without-downtime: done all sequences, now the new workers starts to work fully"
412
+ ENV . delete ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" )
413
+ end
321
414
send_signal_to_workers ( 34 )
322
415
end
323
416
@@ -509,12 +602,11 @@ def self.default_options
509
602
}
510
603
end
511
604
512
- def self . cleanup_resources
513
- unless Fluent . windows?
514
- if ENV . has_key? ( 'SERVERENGINE_SOCKETMANAGER_PATH' )
515
- FileUtils . rm_f ( ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ] )
516
- end
517
- end
605
+ def self . cleanup_socketmanager_path
606
+ return if Fluent . windows?
607
+ return unless ENV . key? ( 'SERVERENGINE_SOCKETMANAGER_PATH' )
608
+
609
+ FileUtils . rm_f ( ENV [ 'SERVERENGINE_SOCKETMANAGER_PATH' ] )
518
610
end
519
611
520
612
def initialize ( cl_opt )
@@ -578,7 +670,7 @@ def run_supervisor(dry_run: false)
578
670
begin
579
671
ServerEngine ::Privilege . change ( @chuser , @chgroup )
580
672
MessagePackFactory . init ( enable_time_support : @system_config . enable_msgpack_time_support )
581
- Fluent ::Engine . init ( @system_config , supervisor_mode : true )
673
+ Fluent ::Engine . init ( @system_config , supervisor_mode : true , start_in_parallel : ENV . key? ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" ) )
582
674
Fluent ::Engine . run_configure ( @conf , dry_run : dry_run )
583
675
rescue Fluent ::ConfigError => e
584
676
$log. error 'config error' , file : @config_path , error : e
@@ -623,10 +715,10 @@ def run_worker
623
715
File . umask ( @chumask . to_i ( 8 ) )
624
716
end
625
717
MessagePackFactory . init ( enable_time_support : @system_config . enable_msgpack_time_support )
626
- Fluent ::Engine . init ( @system_config )
718
+ Fluent ::Engine . init ( @system_config , start_in_parallel : ENV . key? ( "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" ) )
627
719
Fluent ::Engine . run_configure ( @conf )
628
720
Fluent ::Engine . run
629
- self . class . cleanup_resources if @standalone_worker
721
+ self . class . cleanup_socketmanager_path if @standalone_worker
630
722
exit 0
631
723
end
632
724
end
@@ -844,7 +936,8 @@ def install_main_process_signal_handlers
844
936
end
845
937
846
938
trap :USR2 do
847
- reload_config
939
+ # Do nothing
940
+ # TODO consider suitable code for this
848
941
end
849
942
850
943
trap :CONT do
0 commit comments