Skip to content

Add with-source-only feature #4661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
cmd_opts[:without_source] = b
}

unless Fluent.windows?
op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b|
cmd_opts[:with_source_only] = b
}
end

op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
if (s == 'yaml') || (s == 'yml')
cmd_opts[:config_file_type] = s.to_sym
Expand Down
78 changes: 47 additions & 31 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def initialize
@system_config = SystemConfig.new

@supervisor_mode = false

@root_agent_mutex = Mutex.new
end

MAINLOOP_SLEEP_INTERVAL = 0.3
Expand Down Expand Up @@ -133,7 +135,15 @@ def emit_stream(tag, es)
end

def flush!
@root_agent.flush!
@root_agent_mutex.synchronize do
@root_agent.flush!
end
end

def cancel_source_only!
@root_agent_mutex.synchronize do
@root_agent.cancel_source_only!
end
end

def now
Expand All @@ -144,7 +154,9 @@ def now
def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
@root_agent_mutex.synchronize do
start
end

@fluent_log_event_router.start

Expand All @@ -158,47 +170,51 @@ def run
raise
end

stop_phase(@root_agent)
@root_agent_mutex.synchronize do
stop_phase(@root_agent)
end
end

# @param conf [Fluent::Config]
# @param supervisor [Bool]
# @reutrn nil
def reload_config(conf, supervisor: false)
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
@root_agent_mutex.synchronize do
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
end
end
end

# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end
# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end

unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end

# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
return
end
# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
return
end

stop_phase(old_agent)
stop_phase(old_agent)

$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
end
end

def stop
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

require 'securerandom'

require 'serverengine/utils'
require 'fluent/oj_options'

Expand All @@ -25,6 +27,7 @@ module Fluent
DEFAULT_OJ_OPTIONS = Fluent::OjOptions.load_env
DEFAULT_DIR_PERMISSION = 0755
DEFAULT_FILE_PERMISSION = 0644
INSTANCE_ID = ENV['FLUENT_INSTANCE_ID'] || SecureRandom.uuid

def self.windows?
ServerEngine.windows?
Expand Down
40 changes: 40 additions & 0 deletions lib/fluent/plugin/out_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferOutput < Output
Fluent::Plugin.register_output("buffer", self)
helpers :event_emitter

config_section :buffer do
config_set_default :@type, "file"
config_set_default :chunk_keys, ["tag"]
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 10
end

def multi_workers_ready?
true
end

def write(chunk)
return if chunk.empty?
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def retry_state(randomize)
end

def submit_flush_once
return unless @buffer_config.flush_thread_count > 0
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
Expand All @@ -1406,6 +1407,7 @@ def force_flush
end

def submit_flush_all
return unless @buffer_config.flush_thread_count > 0
while !@retry && @buffer.queued?
submit_flush_once
sleep @buffer_config.flush_thread_burst_interval
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,14 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_apply_source_only
@_event_emitter_force_source_only_router = true
end

def event_emitter_cancel_source_only
@_event_emitter_force_source_only_router = false
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +83,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_source_only_router = false
@router = nil
end

Expand Down
Loading