Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable chunk.each only for limited plugins using msgpack #1263

Merged
merged 3 commits into from
Oct 7, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded
class Chunk
include MonitorMixin
include UniqueId::Mixin
include ChunkMessagePackEventStreamer

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
Expand Down
10 changes: 9 additions & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def format(tag, time, record)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def formatted_to_msgpack_binary
# To indicate custom format method (#format) returns msgpack binary or not.
# If #format returns msgpack binary, override this method to return true.
false
end

def prefer_buffered_processing
# override this method to return false only when all of these are true:
# * plugin has both implementation for buffered and non-buffered methods
Expand Down Expand Up @@ -176,6 +182,7 @@ def initialize
@buffering = true
end
@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = false # decided later

@buffer = nil
@secondary = nil
Expand Down Expand Up @@ -340,6 +347,7 @@ def start
end

@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true
@delayed_commit = if implement?(:buffered) && implement?(:delayed_commit)
prefer_delayed_commit
else
Expand Down Expand Up @@ -955,7 +963,7 @@ def try_flush
using_secondary = true
end

unless @custom_format
if @enable_msgpack_streamer
chunk.extend ChunkMessagePackEventStreamer
end

Expand Down
18 changes: 15 additions & 3 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase
assert chunk.respond_to?(:read)
assert chunk.respond_to?(:open)
assert chunk.respond_to?(:write_to)
assert chunk.respond_to?(:msgpack_each)
assert_raise(NotImplementedError){ chunk.append([]) }
assert_raise(NotImplementedError){ chunk.concat(nil, 0) }
assert_raise(NotImplementedError){ chunk.commit }
Expand All @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase
assert_raise(NotImplementedError){ chunk.read }
assert_raise(NotImplementedError){ chunk.open(){} }
assert_raise(NotImplementedError){ chunk.write_to(nil) }
assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} }
assert !chunk.respond_to?(:msgpack_each)
end

test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

assert !chunk.respond_to?(:each)
assert !chunk.respond_to?(:msgpack_each)

chunk.extend Fluent::ChunkMessagePackEventStreamer
assert chunk.respond_to?(:each)
assert chunk.respond_to?(:msgpack_each)
end

test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
Expand Down Expand Up @@ -162,9 +173,10 @@ def open(**kwargs)
assert "my data\nyour data\n", io.to_s
end

test 'can feed objects into blocks with unpacking msgpack' do
test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do
require 'msgpack'
c = TestChunk.new(Object.new)
c.extend Fluent::ChunkMessagePackEventStreamer
c.data << MessagePack.pack(['my data', 1])
c.data << MessagePack.pack(['your data', 2])
ary = []
Expand Down
113 changes: 113 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,47 @@ def shutdown
super
end
end
class DummyStandardBufferedOutput < DummyBareOutput
def initialize
super
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyCustomFormatBufferedOutput < DummyBareOutput
def initialize
super
@format_type_is_msgpack = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who set this callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I missed to add tests using custom formats.

@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
def formatted_to_msgpack_binary
@format_type_is_msgpack ? @format_type_is_msgpack.call : false
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyFullFeatureOutput < DummyBareOutput
def initialize
super
Expand Down Expand Up @@ -94,6 +135,8 @@ def create_output(type=:full)
when :sync then FluentPluginOutputAsBufferedTest::DummySyncOutput.new
when :buffered then FluentPluginOutputAsBufferedTest::DummyAsyncOutput.new
when :delayed then FluentPluginOutputAsBufferedTest::DummyDelayedOutput.new
when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new
when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new
when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new
else
raise ArgumentError, "unknown type: #{type}"
Expand Down Expand Up @@ -125,6 +168,76 @@ def waiting(seconds)
Timecop.return
end

sub_test_case 'chunk feature in #write for output plugins' do
setup do
@stored_global_logger = $log
$log = Fluent::Test::TestLogger.new
@hash = {
'flush_mode' => 'immediate',
'flush_thread_interval' => '0.01',
'flush_thread_burst_interval' => '0.01',
}
end

teardown do
$log = @stored_global_logger
end

test 'plugin using standard format can iterate chunk for time, record in #write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end

test 'plugin using standard format can iterate chunk for time, record in #try_write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :try_write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end
end

sub_test_case 'buffered output configured with many chunk keys' do
setup do
@stored_global_logger = $log
Expand Down