diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index bf68d5d44a..02bf0e442a 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -850,8 +850,11 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) es.each do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] - meta_and_data[meta] << format(tag, time, record) - records += 1 + res = format(tag, time, record) + if res + meta_and_data[meta] << res + records += 1 + end end write_guard do @buffer.write(meta_and_data, enqueue: enqueue) @@ -885,8 +888,11 @@ def handle_stream_simple(tag, es, enqueue: false) records = 0 data = [] es.each do |time, record| - data << format(tag, time, record) - records += 1 + res = format(tag, time, record) + if res + data << res + records += 1 + end end else format_proc = generate_format_proc diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index aa11e23071..e8240706df 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -339,6 +339,41 @@ def waiting(seconds) end end + data(:handle_stream_simple => '', + :handle_stream_with_custom_format => 'tag,message') + test 'plugin using custom format can skip record chunk when format return nil' do |chunk_keys| + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT', '', {}, [config_element('buffer', chunk_keys, @hash)])) + @i.register(:prefer_delayed_commit) { false } + @i.register(:format) { |tag, time, record| + if record['message'] == 'test1' + nil + else + [tag,time,record].to_msgpack + end + } + @i.register(:format_type_is_msgpack) { true } + @i.register(:write){ |chunk| e = []; chunk.each { |ta, t, r| e << [ta, t, r] }; events_from_chunk << [:write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "test1"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "test2"}], + ] + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5) { sleep 0.1 until events_from_chunk.size == 1 } + + assert_equal 1, events_from_chunk.size + assert_equal :write, events_from_chunk[0][0] + each_pushed = events_from_chunk[0][1] + assert_equal 1, each_pushed.size + assert_equal 'test.tag', each_pushed[0][0] + assert_equal "test2", each_pushed[0][2]['message'] + end + test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do events_from_chunk = [] @i = create_output(:custom)