Skip to content

Commit eafbc19

Browse files
committed
Add with-source-only feature
* Mainly intended to be used by command option `--with-source-only` * We can use system-config option `with_source_only` as well * Not supported on Windows It launches Fluentd with Input plugins only. Here is the specification. * Those Input plugins emits the events to SourceOnlyBufferAgent. * The events is kept in the buf_file during the source-only-mode. * Send SIGWINCH to the supervisor to cancels the mode. * After canceled, the new agent starts to load the buffer. Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 5c3b71f commit eafbc19

File tree

10 files changed

+339
-51
lines changed

10 files changed

+339
-51
lines changed

lib/fluent/command/fluentd.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@
127127
cmd_opts[:without_source] = b
128128
}
129129

130+
unless Fluent.windows?
131+
op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b|
132+
cmd_opts[:with_source_only] = b
133+
}
134+
end
135+
130136
op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
131137
if (s == 'yaml') || (s == 'yml')
132138
cmd_opts[:config_file_type] = s.to_sym

lib/fluent/engine.rb

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def initialize
4343
@system_config = SystemConfig.new
4444

4545
@supervisor_mode = false
46+
47+
@root_agent_mutex = Mutex.new
4648
end
4749

4850
MAINLOOP_SLEEP_INTERVAL = 0.3
@@ -133,7 +135,15 @@ def emit_stream(tag, es)
133135
end
134136

135137
def flush!
136-
@root_agent.flush!
138+
@root_agent_mutex.synchronize do
139+
@root_agent.flush!
140+
end
141+
end
142+
143+
def cancel_source_only!
144+
@root_agent_mutex.synchronize do
145+
@root_agent.cancel_source_only!
146+
end
137147
end
138148

139149
def now
@@ -144,7 +154,9 @@ def now
144154
def run
145155
begin
146156
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
147-
start
157+
@root_agent_mutex.synchronize do
158+
start
159+
end
148160

149161
@fluent_log_event_router.start
150162

@@ -158,47 +170,51 @@ def run
158170
raise
159171
end
160172

161-
stop_phase(@root_agent)
173+
@root_agent_mutex.synchronize do
174+
stop_phase(@root_agent)
175+
end
162176
end
163177

164178
# @param conf [Fluent::Config]
165179
# @param supervisor [Bool]
166180
# @reutrn nil
167181
def reload_config(conf, supervisor: false)
168-
# configure first to reduce down time while restarting
169-
new_agent = RootAgent.new(log: log, system_config: @system_config)
170-
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)
171-
172-
ret.all_plugins.each do |plugin|
173-
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
174-
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
182+
@root_agent_mutex.synchronize do
183+
# configure first to reduce down time while restarting
184+
new_agent = RootAgent.new(log: log, system_config: @system_config)
185+
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)
186+
187+
ret.all_plugins.each do |plugin|
188+
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
189+
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
190+
end
175191
end
176-
end
177192

178-
# Assign @root_agent to new root_agent
179-
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
180-
old_agent, @root_agent = @root_agent, new_agent
181-
begin
182-
@root_agent.configure(conf)
183-
rescue
184-
@root_agent = old_agent
185-
raise
186-
end
193+
# Assign @root_agent to new root_agent
194+
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
195+
old_agent, @root_agent = @root_agent, new_agent
196+
begin
197+
@root_agent.configure(conf)
198+
rescue
199+
@root_agent = old_agent
200+
raise
201+
end
187202

188-
unless @suppress_config_dump
189-
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
190-
end
203+
unless @suppress_config_dump
204+
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
205+
end
191206

192-
# supervisor doesn't handle actual data. so the following code is unnecessary.
193-
if supervisor
194-
old_agent.shutdown # to close thread created in #configure
195-
return
196-
end
207+
# supervisor doesn't handle actual data. so the following code is unnecessary.
208+
if supervisor
209+
old_agent.shutdown # to close thread created in #configure
210+
return
211+
end
197212

198-
stop_phase(old_agent)
213+
stop_phase(old_agent)
199214

200-
$log.info 'restart fluentd worker', worker: worker_id
201-
start_phase(new_agent)
215+
$log.info 'restart fluentd worker', worker: worker_id
216+
start_phase(new_agent)
217+
end
202218
end
203219

204220
def stop

lib/fluent/env.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515
#
1616

17+
require 'securerandom'
18+
1719
require 'serverengine/utils'
1820
require 'fluent/oj_options'
1921

@@ -25,6 +27,7 @@ module Fluent
2527
DEFAULT_OJ_OPTIONS = Fluent::OjOptions.load_env
2628
DEFAULT_DIR_PERMISSION = 0755
2729
DEFAULT_FILE_PERMISSION = 0644
30+
INSTANCE_ID = ENV['FLUENT_INSTANCE_ID'] || SecureRandom.uuid
2831

2932
def self.windows?
3033
ServerEngine.windows?

lib/fluent/plugin/out_buffer.rb

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#
2+
# Fluentd
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
require 'fluent/plugin/output'
18+
19+
module Fluent::Plugin
20+
class BufferOutput < Output
21+
Fluent::Plugin.register_output("buffer", self)
22+
helpers :event_emitter
23+
24+
config_section :buffer do
25+
config_set_default :@type, "file"
26+
config_set_default :chunk_keys, ["tag"]
27+
config_set_default :flush_mode, :interval
28+
config_set_default :flush_interval, 10
29+
end
30+
31+
def multi_workers_ready?
32+
true
33+
end
34+
35+
def write(chunk)
36+
return if chunk.empty?
37+
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
38+
end
39+
end
40+
end

lib/fluent/plugin/output.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,7 @@ def retry_state(randomize)
13841384
end
13851385

13861386
def submit_flush_once
1387+
return unless @buffer_config.flush_thread_count > 0
13871388
# Without locks: it is rough but enough to select "next" writer selection
13881389
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
13891390
state = @output_flush_threads[@output_flush_thread_current_position]
@@ -1406,6 +1407,7 @@ def force_flush
14061407
end
14071408

14081409
def submit_flush_all
1410+
return unless @buffer_config.flush_thread_count > 0
14091411
while !@retry && @buffer.queued?
14101412
submit_flush_once
14111413
sleep @buffer_config.flush_thread_burst_interval

lib/fluent/plugin_helper/event_emitter.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module EventEmitter
2626

2727
def router
2828
@_event_emitter_used_actually = true
29+
30+
return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router
31+
2932
if @_event_emitter_lazy_init
3033
@router = @primary_instance.router
3134
end
@@ -48,6 +51,14 @@ def event_emitter_used_actually?
4851
@_event_emitter_used_actually
4952
end
5053

54+
def event_emitter_apply_source_only
55+
@_event_emitter_force_source_only_router = true
56+
end
57+
58+
def event_emitter_cancel_source_only
59+
@_event_emitter_force_source_only_router = false
60+
end
61+
5162
def event_emitter_router(label_name)
5263
if label_name
5364
if label_name == "@ROOT"
@@ -72,6 +83,7 @@ def initialize
7283
super
7384
@_event_emitter_used_actually = false
7485
@_event_emitter_lazy_init = false
86+
@_event_emitter_force_source_only_router = false
7587
@router = nil
7688
end
7789

0 commit comments

Comments
 (0)