Skip to content

Commit

Permalink
Merge pull request #2709 from ganmacs/remove-unnecessary-config-reload
Browse files Browse the repository at this point in the history
Remove unnecessary config reload
  • Loading branch information
ganmacs authored Dec 3, 2019
2 parents 863ecea + c98b348 commit 29ad441
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 97 deletions.
97 changes: 40 additions & 57 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ def kill_worker
end

def supervisor_dump_config_handler
$log.info config[:fluentd_conf].to_s
$log.info config[:fluentd_conf]
end

def supervisor_get_dump_config_handler
{conf: config[:fluentd_conf].to_s}
{conf: config[:fluentd_conf]}
end
end

Expand All @@ -234,7 +234,6 @@ def after_start

class Supervisor
def self.load_config(path, params = {})

pre_loadtime = 0
pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
pre_config_mtime = nil
Expand All @@ -246,21 +245,6 @@ def self.load_config(path, params = {})
return params['pre_conf']
end

config_fname = File.basename(path)
config_basedir = File.dirname(path)
# Assume fluent.conf encoding is UTF-8
config_data = File.open(path, "r:#{params['conf_encoding']}:utf-8") {|f| f.read }
inline_config = params['inline_config']
if inline_config
config_data << "\n" << inline_config.gsub("\\n","\n")
end
fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
system_config = SystemConfig.create(fluentd_conf)

# these params must NOT be configured via system config here.
# these may be overridden by command line params.
workers = params['workers']
root_dir = params['root_dir']
log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']

Expand All @@ -269,9 +253,6 @@ def self.load_config(path, params = {})
chgroup = params['chgroup']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']
rpc_endpoint = system_config.rpc_endpoint
enable_get_dump = system_config.enable_get_dump
counter_server = system_config.counter_server

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
logger_initializer = Supervisor::LoggerInitializer.new(
Expand All @@ -288,43 +269,41 @@ def self.load_config(path, params = {})
# ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
pid_path = params['daemonize']
daemonize = !!params['daemonize']
main_cmd = params['main_cmd']
signame = params['signame']

se_config = {
worker_type: 'spawn',
workers: workers,
log_stdin: false,
log_stdout: false,
log_stderr: false,
enable_heartbeat: true,
auto_heartbeat: false,
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: root_dir,
logger: logger,
log: logger.out,
log_path: log_path,
log_level: log_level,
logger_initializer: logger_initializer,
chuser: chuser,
chgroup: chgroup,
chumask: 0,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
daemonize: daemonize,
rpc_endpoint: rpc_endpoint,
counter_server: counter_server,
enable_get_dump: enable_get_dump,
windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
File.join(File.dirname(__FILE__), 'daemon.rb'),
ServerModule.name,
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: command_sender,
fluentd_conf: fluentd_conf,
main_cmd: main_cmd,
signame: signame,
worker_type: 'spawn',
workers: params['workers'],
log_stdin: false,
log_stdout: false,
log_stderr: false,
enable_heartbeat: true,
auto_heartbeat: false,
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: params['root_dir'],
logger: logger,
log: logger.out,
log_path: log_path,
log_level: log_level,
logger_initializer: logger_initializer,
chuser: chuser,
chgroup: chgroup,
chumask: 0,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
enable_get_dump: params['enable_get_dump'],
windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
File.join(File.dirname(__FILE__), 'daemon.rb'),
ServerModule.name,
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: command_sender,
fluentd_conf: params['fluentd_conf'],
main_cmd: params['main_cmd'],
signame: params['signame'],
}
if daemonize
se_config[:pid_path] = pid_path
Expand All @@ -337,7 +316,7 @@ def self.load_config(path, params = {})
pre_params['pre_conf'] = nil
params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

return se_config
se_config
end

class LoggerInitializer
Expand Down Expand Up @@ -640,11 +619,15 @@ def supervise
'use_v1_config' => @use_v1_config,
'conf_encoding' => @conf_encoding,
'signame' => @signame,
'fluentd_conf' => @conf.to_s,

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
}

se = ServerEngine.create(ServerModule, WorkerModule){
Expand Down
40 changes: 0 additions & 40 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,46 +372,6 @@ def test_load_config_for_daemonize
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
end

def test_load_config_with_multibyte_string
tmp_path = "#{TMP_DIR}/dir/test_multibyte_config.conf"
conf_str = %[
<source>
@type forward
@id forward_input
@label @INPUT
</source>
<label @INPUT>
<filter>
@type record_transformer
<record>
message こんにちは. ${record["name"]} has made a order of ${record["item"]} just now.
</record>
</filter>
<match>
@type stdout
</match>
</label>
]
FileUtils.mkdir_p(File.dirname(tmp_path))
File.open(tmp_path, "w:utf-8") {|file| file.write(conf_str) }

params = {}
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_path, params) }

se_config = load_config_proc.call
conf = se_config[:fluentd_conf]
label = conf.elements.detect {|e| e.name == "label" }
filter = label.elements.detect {|e| e.name == "filter" }
record_transformer = filter.elements.detect {|e| e.name = "record_transformer" }
assert_equal(Encoding::UTF_8, record_transformer["message"].encoding)
end

def test_logger
opts = Fluent::Supervisor.default_options
sv = Fluent::Supervisor.new(opts)
Expand Down

0 comments on commit 29ad441

Please sign in to comment.