Skip to content

Commit

Permalink
Merge pull request #974 from fluent/bare-output
Browse files Browse the repository at this point in the history
add Fluent::Plugin::BareOutput to support v0.12 MultiOutput plugins
  • Loading branch information
tagomoris committed May 23, 2016
2 parents ce372dd + fd18dbb commit d5476d7
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 46 deletions.
11 changes: 5 additions & 6 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

require 'fluent/plugin'
require 'fluent/plugin/output'
require 'fluent/plugin/multi_output'
require 'fluent/plugin/bare_output'
require 'fluent/compat/call_super_mixin'
require 'fluent/compat/output_chain'
require 'fluent/timezone'
Expand Down Expand Up @@ -149,11 +149,10 @@ def initialize
end
end

class MultiOutput < Fluent::Plugin::MultiOutput
def initialize
super
@compat = true
end
class MultiOutput < Fluent::Plugin::BareOutput
# TODO: warn when deprecated

helpers :event_emitter

def process(tag, es)
emit(tag, es, NULL_OUTPUT_CHAIN)
Expand Down
63 changes: 63 additions & 0 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/base'

require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'

module Fluent
module Plugin
class BareOutput < Base
# DO NOT USE THIS plugin for normal output plugin. Use Output instead.
# This output plugin base class is only for meta-output plugins
# which cannot be implemented on MultiOutput.
# E.g,: forest, config-expander

include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin

attr_reader :num_errors, :emit_count, :emit_records

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def initialize
super
@counters_monitor = Monitor.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
begin
process(tag, es)
@counters_monitor.synchronize{ @emit_records += es.size }
rescue
@counters_monitor.synchronize{ @num_errors += 1 }
raise
end
end
alias :emit_events :emit_sync
end
end
end
9 changes: 4 additions & 5 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class MultiOutput < Base

attr_reader :outputs

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def initialize
super
@outputs = []
Expand All @@ -52,9 +56,6 @@ def initialize
def configure(conf)
super

# v0.12 MultiOutput does nothing about initializing stores, and plugin implementation did it.
return if @compat

@stores.each do |store|
store_conf = store.corresponding_config_element
type = store_conf['@type']
Expand Down Expand Up @@ -89,8 +90,6 @@ def emit_sync(tag, es)
end
end
alias :emit_events :emit_sync

# def process(tag, es)
end
end
end
118 changes: 118 additions & 0 deletions test/plugin/test_bare_output.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
require_relative '../helper'
require 'fluent/plugin/bare_output'
require 'fluent/event'

module FluentPluginBareOutputTest
class DummyPlugin < Fluent::Plugin::BareOutput
attr_reader :store
def initialize
super
@store = []
end
def process(tag, es)
es.each do |time, record|
@store << [tag, time, record]
end
end
end
end

class BareOutputTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
@p = FluentPluginBareOutputTest::DummyPlugin.new
end

test 'has healthy lifecycle' do
assert !@p.configured?
@p.configure(config_element())
assert @p.configured?

assert !@p.started?
@p.start
assert @p.start

assert !@p.stopped?
@p.stop
assert @p.stopped?

assert !@p.before_shutdown?
@p.before_shutdown
assert @p.before_shutdown?

assert !@p.shutdown?
@p.shutdown
assert @p.shutdown?

assert !@p.after_shutdown?
@p.after_shutdown
assert @p.after_shutdown?

assert !@p.closed?
@p.close
assert @p.closed?

assert !@p.terminated?
@p.terminate
assert @p.terminated?
end

test 'has plugin_id automatically generated' do
assert @p.respond_to?(:plugin_id_configured?)
assert @p.respond_to?(:plugin_id)

@p.configure(config_element())

assert !@p.plugin_id_configured?
assert @p.plugin_id
assert{ @p.plugin_id != 'mytest' }
end

test 'has plugin_id manually configured' do
@p.configure(config_element('ROOT', '', {'@id' => 'mytest'}))
assert @p.plugin_id_configured?
assert_equal 'mytest', @p.plugin_id
end

test 'has plugin logger' do
assert @p.respond_to?(:log)
assert @p.log

# default logger
original_logger = @p.log

@p.configure(config_element('ROOT', '', {'@log_level' => 'debug'}))

assert{ @p.log.object_id != original_logger.object_id }
assert_equal Fluent::Log::LEVEL_DEBUG, @p.log.level
end

test 'can load plugin helpers' do
assert_nothing_raised do
class FluentPluginBareOutputTest::DummyPlugin2 < Fluent::Plugin::BareOutput
helpers :storage
end
end
end

test 'can get input event stream to write' do
@p.configure(config_element('ROOT'))
@p.start

es1 = Fluent::OneEventStream.new(event_time('2016-05-21 18:37:31 +0900'), {'k1' => 'v1'})
es2 = Fluent::ArrayEventStream.new([
[event_time('2016-05-21 18:38:33 +0900'), {'k2' => 'v2'}],
[event_time('2016-05-21 18:39:10 +0900'), {'k3' => 'v3'}],
])
@p.emit_events('mytest1', es1)
@p.emit_events('mytest2', es2)

all_events = [
['mytest1', event_time('2016-05-21 18:37:31 +0900'), {'k1' => 'v1'}],
['mytest2', event_time('2016-05-21 18:38:33 +0900'), {'k2' => 'v2'}],
['mytest2', event_time('2016-05-21 18:39:10 +0900'), {'k3' => 'v3'}],
]

assert_equal all_events, @p.store
end
end
36 changes: 1 addition & 35 deletions test/plugin/test_multi_output.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/multi_output'
require 'fluent/event'

require 'json'
Expand Down Expand Up @@ -177,38 +177,4 @@ def create_output(type=:multi)
assert_equal 2, @i.events.size
end
end

sub_test_case 'compat multi output plugin' do
setup do
Fluent::Test.setup
@i = create_output(:compat_multi)
end

teardown do
@i.log.out.reset
end

test '#configure raises error if <store> sections are missing' do
conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' }, [])
assert_raise Fluent::ConfigError do
@i.configure(conf)
end
end

test '#configure does NOT initialize child plugins' do
assert_equal [], @i.outputs

conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' },
[
config_element('store', '', { '@type' => 'dummy_test_multi_output_1' }),
config_element('store', '', { '@type' => 'dummy_test_multi_output_2' }),
config_element('store', '', { '@type' => 'dummy_test_multi_output_3' }),
config_element('store', '', { '@type' => 'dummy_test_multi_output_4' }),
]
)
@i.configure(conf)

assert_equal [], @i.outputs
end
end
end

0 comments on commit d5476d7

Please sign in to comment.