Skip to content

Commit 7ab49c3

Browse files
committed
fluentd command: add --with-source-only
It launches Fluentd with Input plugins only. Here is the specification. * Those Input plugins emits the events to SourceOnlyBufferAgent. * The events is kept in the buf_file during the source-only-mode. * SIGRTMIN(34) cancels the mode. * After canceled, the new agent starts to load the buffer. Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 5c3b71f commit 7ab49c3

File tree

10 files changed

+247
-20
lines changed

10 files changed

+247
-20
lines changed

lib/fluent/command/fluentd.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@
127127
cmd_opts[:without_source] = b
128128
}
129129

130+
op.on('--with-source-only', "invoke a fluentd only with input plugins", TrueClass) {|b|
131+
cmd_opts[:with_source_only] = b
132+
}
133+
130134
op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
131135
if (s == 'yaml') || (s == 'yml')
132136
cmd_opts[:config_file_type] = s.to_sym

lib/fluent/engine.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def flush!
136136
@root_agent.flush!
137137
end
138138

139+
def cancel_source_only!
140+
@root_agent.cancel_source_only!
141+
end
142+
139143
def now
140144
# TODO thread update
141145
Fluent::EventTime.now

lib/fluent/plugin/buf_file.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class FileBuffer < Fluent::Plugin::Buffer
4242
config_param :file_permission, :string, default: nil # '0644' (Fluent::DEFAULT_FILE_PERMISSION)
4343
config_param :dir_permission, :string, default: nil # '0755' (Fluent::DEFAULT_DIR_PERMISSION)
4444

45+
attr_reader :buffer_path
46+
4547
def initialize
4648
super
4749
@symlink_path = nil

lib/fluent/plugin/out_buffer.rb

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Fluentd
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
require 'fluent/plugin/output'
18+
19+
module Fluent::Plugin
20+
class BufferOutput < Output
21+
Fluent::Plugin.register_output("buffer", self)
22+
helpers :event_emitter
23+
24+
desc "Try to remove the buffer directory after terminate." +
25+
" Mainly for the --with-source-only feature."
26+
config_param :cleanup_after_shutdown, :bool, default: false
27+
28+
config_section :buffer do
29+
config_set_default :@type, "file"
30+
config_set_default :chunk_keys, ["tag"]
31+
config_set_default :flush_mode, :interval
32+
config_set_default :flush_interval, 10
33+
end
34+
35+
def multi_workers_ready?
36+
true
37+
end
38+
39+
def initialize
40+
super
41+
42+
@buffer_path = nil
43+
end
44+
45+
def configure(conf)
46+
super
47+
48+
raise Fluent::ConfigError, "The buffer plugin does not be supported. It must have 'buffer_path' getter. You can use 'file' buffer." unless @buffer.respond_to?(:buffer_path)
49+
50+
@buffer_path = @buffer.buffer_path
51+
end
52+
53+
def write(chunk)
54+
return if chunk.empty?
55+
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
56+
end
57+
58+
def terminate
59+
super
60+
61+
return unless cleanup_after_shutdown
62+
63+
unless Dir.empty?(@buffer_path)
64+
if @buffer_config.flush_at_shutdown
65+
log.warn "some buffer files remain in #{@buffer_path}, so cancel cleanup the directory." +
66+
" Please consider saving or recovering the buffer files in the directory."
67+
end
68+
return
69+
end
70+
71+
begin
72+
FileUtils.remove_dir(@buffer_path)
73+
rescue => e
74+
log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e
75+
end
76+
end
77+
end
78+
end

lib/fluent/plugin/output.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,7 @@ def retry_state(randomize)
13841384
end
13851385

13861386
def submit_flush_once
1387+
return unless @buffer_config.flush_thread_count > 0
13871388
# Without locks: it is rough but enough to select "next" writer selection
13881389
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
13891390
state = @output_flush_threads[@output_flush_thread_current_position]

lib/fluent/plugin_helper/event_emitter.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module EventEmitter
2626

2727
def router
2828
@_event_emitter_used_actually = true
29+
30+
return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router
31+
2932
if @_event_emitter_lazy_init
3033
@router = @primary_instance.router
3134
end
@@ -48,6 +51,14 @@ def event_emitter_used_actually?
4851
@_event_emitter_used_actually
4952
end
5053

54+
def event_emitter_set_source_only
55+
@_event_emitter_force_source_only_router = true
56+
end
57+
58+
def event_emitter_cancel_source_only
59+
@_event_emitter_force_source_only_router = false
60+
end
61+
5162
def event_emitter_router(label_name)
5263
if label_name
5364
if label_name == "@ROOT"
@@ -72,6 +83,7 @@ def initialize
7283
super
7384
@_event_emitter_used_actually = false
7485
@_event_emitter_lazy_init = false
86+
@_event_emitter_force_source_only_router = false
7587
@router = nil
7688
end
7789

lib/fluent/root_agent.rb

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
require 'fluent/plugin'
2323
require 'fluent/system_config'
2424
require 'fluent/time'
25+
require 'fluent/source_only_buffer_agent'
2526

2627
module Fluent
2728
#
@@ -54,17 +55,22 @@ def initialize(log:, system_config: SystemConfig.new)
5455
@inputs = []
5556
@suppress_emit_error_log_interval = 0
5657
@next_emit_error_log_time = nil
57-
@without_source = false
58-
@enable_input_metrics = false
58+
@without_source = system_config.without_source || false
59+
@with_source_only = system_config.with_source_only || false
60+
@source_only_buffer_agent = nil
61+
@enable_input_metrics = system_config.enable_input_metrics || false
5962

6063
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
61-
@without_source = system_config.without_source unless system_config.without_source.nil?
62-
@enable_input_metrics = !!system_config.enable_input_metrics
6364
end
6465

6566
attr_reader :inputs
6667
attr_reader :labels
6768

69+
def source_only_router
70+
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
71+
@source_only_buffer_agent.event_router
72+
end
73+
6874
def configure(conf)
6975
used_worker_ids = []
7076
available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
@@ -148,6 +154,8 @@ def configure(conf)
148154

149155
super
150156

157+
setup_source_only_buffer_agent if @with_source_only
158+
151159
# initialize <source> elements
152160
if @without_source
153161
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
@@ -169,16 +177,29 @@ def setup_error_label(e)
169177
@error_collector = error_label.event_router
170178
end
171179

172-
def lifecycle(desc: false, kind_callback: nil)
173-
kind_or_label_list = if desc
174-
[:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten
175-
else
176-
[:input, :output_with_router, @labels.values, :filter, :output].flatten
177-
end
178-
kind_or_label_list.each do |kind|
180+
def setup_source_only_buffer_agent(flush: false)
181+
@source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config)
182+
@source_only_buffer_agent.configure(flush: flush)
183+
end
184+
185+
def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
186+
unless kind_or_agent_list
187+
if @with_source_only
188+
kind_or_agent_list = [:input, @source_only_buffer_agent]
189+
elsif @source_only_buffer_agent
190+
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
191+
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
192+
else
193+
kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten
194+
end
195+
196+
kind_or_agent_list.reverse! if desc
197+
end
198+
199+
kind_or_agent_list.each do |kind|
179200
if kind.respond_to?(:lifecycle)
180-
label = kind
181-
label.lifecycle(desc: desc) do |plugin, display_kind|
201+
agent = kind
202+
agent.lifecycle(desc: desc) do |plugin, display_kind|
182203
yield plugin, display_kind
183204
end
184205
else
@@ -198,8 +219,8 @@ def lifecycle(desc: false, kind_callback: nil)
198219
end
199220
end
200221

201-
def start
202-
lifecycle(desc: true) do |i| # instance
222+
def start(kind_or_agent_list: nil)
223+
lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance
203224
i.start unless i.started?
204225
# Input#start sometimes emits lots of events with in_tail/`read_from_head true` case
205226
# and it causes deadlock for small buffer/queue output. To avoid such problem,
@@ -231,13 +252,45 @@ def flush!
231252
flushing_threads.each{|t| t.join }
232253
end
233254

234-
def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
255+
def cancel_source_only!
256+
# TODO exclusive lock
257+
if @with_source_only
258+
log.info "cancel --with-source-only mode and start the other plugins"
259+
@with_source_only = false
260+
start
261+
262+
lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only)
263+
264+
# Want to make sure that the source_only_router finishes all process.
265+
# Strictly speaking, we must make a forwarding feature for the source_only_router
266+
# to make sure it.
267+
# However, it would need exclusive lock for EventRouter and worsen its performance.
268+
# So, just sleep a few seconds here.
269+
sleep 5
270+
271+
shutdown(kind_or_agent_list: [@source_only_buffer_agent])
272+
@source_only_buffer_agent = nil
273+
end
274+
275+
if @source_only_buffer_agent
276+
log.info "do nothing for canceling --with-source-only because it is already canceled, and the loading agent already exists"
277+
return
278+
end
279+
280+
# TODO This agent should stop after flushing its all buffer.
281+
log.info "starts the loading agent for --with-source-only"
282+
setup_source_only_buffer_agent(flush: true)
283+
start(kind_or_agent_list: [@source_only_buffer_agent])
284+
end
285+
286+
def shutdown(kind_or_agent_list: nil)
287+
# Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
235288
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
236289
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
237290
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.
238291

239292
lifecycle_safe_sequence = ->(method, checker) {
240-
lifecycle do |instance, kind|
293+
lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind|
241294
begin
242295
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
243296
instance.__send__(method) unless instance.__send__(checker)
@@ -260,7 +313,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
260313
operation_threads.each{|t| t.join }
261314
operation_threads.clear
262315
}
263-
lifecycle(kind_callback: callback) do |instance, kind|
316+
lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind|
264317
t = Thread.new do
265318
Thread.current.abort_on_exception = true
266319
begin
@@ -318,6 +371,7 @@ def add_source(type, conf)
318371
# See also 'fluentd/plugin/input.rb'
319372
input.context_router = @event_router
320373
input.configure(conf)
374+
input.event_emitter_set_source_only if @with_source_only
321375
if @enable_input_metrics
322376
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
323377
end
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Fluentd
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
require 'securerandom'
18+
19+
require 'fluent/agent'
20+
require 'fluent/system_config'
21+
22+
module Fluent
23+
class SourceOnlyBufferAgent < Agent
24+
BUFFER_DIR_NAME = SecureRandom.uuid
25+
26+
def initialize(log:, system_config:)
27+
super(log: log)
28+
29+
@buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME)
30+
end
31+
32+
def configure(flush: false)
33+
super(
34+
Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [
35+
Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT', 'cleanup_after_shutdown' => 'true'}, [
36+
Config::Element.new('buffer', '', {
37+
'path' => @buffer_path,
38+
'flush_at_shutdown' => flush ? 'true' : 'false',
39+
'flush_thread_count' => flush ? 1 : 0,
40+
'overflow_action' => "drop_oldest_chunk",
41+
}, [])
42+
])
43+
])
44+
)
45+
end
46+
end
47+
end

0 commit comments

Comments
 (0)