Skip to content

Commit

Permalink
Skip record when Output#format returns nil
Browse files Browse the repository at this point in the history
Current Output API can't skip invalid record with custom format.
Old API uses emit for such purpose but new API doesn't have it.
Omitting invalid record in format is good for reducing formatting cost.
  • Loading branch information
repeatedly committed Feb 16, 2017
1 parent 3e3ff24 commit 56ae98b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,11 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
es.each do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= []
meta_and_data[meta] << format(tag, time, record)
records += 1
res = format(tag, time, record)
if res
meta_and_data[meta] << res
records += 1
end
end
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
Expand Down Expand Up @@ -885,8 +888,11 @@ def handle_stream_simple(tag, es, enqueue: false)
records = 0
data = []
es.each do |time, record|
data << format(tag, time, record)
records += 1
res = format(tag, time, record)
if res
data << res
records += 1
end
end
else
format_proc = generate_format_proc
Expand Down
35 changes: 35 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,41 @@ def waiting(seconds)
end
end

data(:handle_stream_simple => '',
:handle_stream_with_custom_format => 'tag,message')
test 'plugin using custom format can skip record chunk when format return nil' do |chunk_keys|
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', chunk_keys, @hash)]))
@i.register(:prefer_delayed_commit) { false }
@i.register(:format) { |tag, time, record|
if record['message'] == 'test1'
nil
else
[tag,time,record].to_msgpack
end
}
@i.register(:format_type_is_msgpack) { true }
@i.register(:write){ |chunk| e = []; chunk.each { |ta, t, r| e << [ta, t, r] }; events_from_chunk << [:write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "test1"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "test2"}],
]
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5) { sleep 0.1 until events_from_chunk.size == 1 }

assert_equal 1, events_from_chunk.size
assert_equal :write, events_from_chunk[0][0]
each_pushed = events_from_chunk[0][1]
assert_equal 1, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal "test2", each_pushed[0][2]['message']
end

test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
Expand Down

0 comments on commit 56ae98b

Please sign in to comment.