Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge buffered and non buffered outputs #1200

Merged
merged 6 commits into from
Sep 2, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example/in_forward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@

<match test>
@type stdout
# <buffer>
# flush_interval 10s
# </buffer>
</match>
16 changes: 10 additions & 6 deletions example/out_buffered_null.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
</source>

<match dummy.**>
@type buffered_null
try_flush_interval 60
flush_interval 60
buffer_chunk_limit 1k
buffer_queue_limit 2
@type null
<buffer>
flush_interval 60s
chunk_limit_size 1k
total_limit_size 4k
</buffer>
</match>

<label error_log>
<match **>
@type stdout # or buffered_stdout
@type stdout
# <buffer>
# flush_interval 1s
# </buffer>
</match>
</label>

Expand Down
59 changes: 0 additions & 59 deletions lib/fluent/plugin/out_buffered_null.rb

This file was deleted.

70 changes: 0 additions & 70 deletions lib/fluent/plugin/out_buffered_stdout.rb

This file was deleted.

38 changes: 38 additions & 0 deletions lib/fluent/plugin/out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,48 @@

module Fluent::Plugin
class NullOutput < Output
# This plugin is for tests of non-buffered/buffered plugins
Fluent::Plugin.register_output('null', self)
Fluent::Plugin.register_output('buffered_null', self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why buffered_null is needed?
This is introduced since v0.14 and there is no production use.
So we don't need to keep the compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.


config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_limit_size, 10 * 1024
end

def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :feed_proc, :delayed

def initialize
super
@delayed = false
@feed_proc = nil
end

def process(tag, es)
# Do nothing
end

def write(chunk)
if @feed_proc
@feed_proc.call(chunk)
end
end

def try_write(chunk)
if @feed_proc
@feed_proc.call(chunk)
end
# not to commit chunks for testing
# commit_write(chunk.unique_id)
end
end
end
41 changes: 39 additions & 2 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,67 @@
module Fluent::Plugin
class StdoutOutput < Output
Fluent::Plugin.register_output('stdout', self)
Fluent::Plugin.register_output('buffered_stdout', self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


helpers :inject, :formatter, :compat_parameters

DEFAULT_FORMAT_TYPE = 'json'

config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_limit_size, 10 * 1024
end

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :delayed

def initialize
super
@delayed = false
end

def configure(conf)
if conf['output_type'] && !conf['format']
conf['format'] = conf['output_type']
end
compat_parameters_convert(conf, :inject, :formatter)

super

@formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
end

def process(tag, es)
es.each {|time,record|
r = inject_values_to_record(tag, time, record)
$log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, r).chomp}\n"
$log.write(format(tag, time, record))
}
$log.flush
end

def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
"#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
end

def write(chunk)
chunk.write_to($log)
end

def try_write(chunk)
chunk.write_to($log)
commit_write(chunk.unique_id)
end
end
end
4 changes: 4 additions & 0 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def test_configure
"@id"=>"null",
"@type" => "null"
},
"buffer_queue_length" => 0,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
Expand Down Expand Up @@ -285,6 +287,8 @@ def get(uri, header = {})
"@id" => "null",
"@type" => "null"
},
"buffer_queue_length" => 0,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
Expand Down
79 changes: 0 additions & 79 deletions test/plugin/test_out_buffered_null.rb

This file was deleted.

Loading