Skip to content

Commit

Permalink
Merge pull request #1469 from fluent/skip-nil-from-format
Browse files Browse the repository at this point in the history
Skip record when Output#format returns nil
  • Loading branch information
repeatedly authored Feb 16, 2017
2 parents 3e3ff24 + 56ae98b commit 101937d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,11 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
es.each do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= []
meta_and_data[meta] << format(tag, time, record)
records += 1
res = format(tag, time, record)
if res
meta_and_data[meta] << res
records += 1
end
end
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
Expand Down Expand Up @@ -885,8 +888,11 @@ def handle_stream_simple(tag, es, enqueue: false)
records = 0
data = []
es.each do |time, record|
data << format(tag, time, record)
records += 1
res = format(tag, time, record)
if res
data << res
records += 1
end
end
else
format_proc = generate_format_proc
Expand Down
35 changes: 35 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,41 @@ def waiting(seconds)
end
end

data(:handle_stream_simple => '',
:handle_stream_with_custom_format => 'tag,message')
test 'plugin using custom format can skip record chunk when format return nil' do |chunk_keys|
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', chunk_keys, @hash)]))
@i.register(:prefer_delayed_commit) { false }
@i.register(:format) { |tag, time, record|
if record['message'] == 'test1'
nil
else
[tag,time,record].to_msgpack
end
}
@i.register(:format_type_is_msgpack) { true }
@i.register(:write){ |chunk| e = []; chunk.each { |ta, t, r| e << [ta, t, r] }; events_from_chunk << [:write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "test1"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "test2"}],
]
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5) { sleep 0.1 until events_from_chunk.size == 1 }

assert_equal 1, events_from_chunk.size
assert_equal :write, events_from_chunk[0][0]
each_pushed = events_from_chunk[0][1]
assert_equal 1, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal "test2", each_pushed[0][2]['message']
end

test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
Expand Down

0 comments on commit 101937d

Please sign in to comment.