diff --git a/lib/fluent/plugin/out_exec.rb b/lib/fluent/plugin/out_exec.rb index 653686bf3e..a0ae008def 100644 --- a/lib/fluent/plugin/out_exec.rb +++ b/lib/fluent/plugin/out_exec.rb @@ -16,18 +16,16 @@ require 'tempfile' -require 'fluent/output' +require 'fluent/plugin/output' require 'fluent/config/error' require 'fluent/plugin/exec_util' +require 'fluent/mixin' # for TimeFormatter -module Fluent - class ExecOutput < TimeSlicedOutput - Plugin.register_output('exec', self) +module Fluent::Plugin + class ExecOutput < Output + Fluent::Plugin.register_output('exec', self) - def initialize - super - @localtime = false - end + helpers :compat_parameters desc 'The command (program) to execute. The exec plugin passes the path of a TSV file as the last argumen' config_param :command, :string @@ -47,6 +45,12 @@ def initialize raise ConfigError, "Unsupported format '#{val}'" unless f f end + config_param :localtime, :bool, default: false + config_param :timezone, :string, default: nil + + def compat_parameters_default_chunk_key + 'time' + end def configure(conf) super @@ -54,7 +58,7 @@ def configure(conf) @formatter = case @format when :tsv if @keys.empty? - raise ConfigError, "keys option is required on exec output for tsv format" + raise Fluent::ConfigError, "keys option is required on exec output for tsv format" end ExecUtil::TSVFormatter.new(@keys) when :json @@ -65,7 +69,7 @@ def configure(conf) if @time_key if @time_format - tf = TimeFormatter.new(@time_format, @localtime, @timezone) + tf = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) @time_format_proc = tf.method(:format) else @time_format_proc = Proc.new { |time| time.to_s } diff --git a/test/plugin/test_out_exec.rb b/test/plugin/test_out_exec.rb index 1fa2ceb293..26f18b0a12 100644 --- a/test/plugin/test_out_exec.rb +++ b/test/plugin/test_out_exec.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/plugin/out_exec' require 'fileutils' @@ -32,7 +32,7 @@ def setup def create_driver(conf = TSV_CONFIG) config = CONFIG + conf - Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::ExecOutput).configure(config, true) + Fluent::Test::Driver::Output.new(Fluent::Plugin::ExecOutput).configure(config) end def create_test_case @@ -51,42 +51,60 @@ def test_configure assert_equal true, d.instance.localtime end + def test_configure_with_compat_buffer_parameters + conf = TSV_CONFIG + %[ + buffer_type memory + time_slice_format %Y%m%d%H + num_threads 5 + buffer_chunk_limit 50m + buffer_queue_limit 128 + flush_at_shutdown yes + ] + d = create_driver(conf) + assert_equal 3600, d.instance.buffer_config.timekey_range + assert_equal 5, d.instance.buffer_config.flush_threads + assert_equal 50*1024*1024, d.instance.buffer.chunk_bytes_limit + assert_equal 128, d.instance.buffer.queue_length_limit + assert d.instance.buffer_config.flush_at_shutdown + end + def test_format d = create_driver time, tests = create_test_case - tests.each { |test| - d.emit(test, time) - } - - d.expect_format %[2011-01-02 13:14:15\ttest\tv1\n] - d.expect_format %[2011-01-02 13:14:15\ttest\tv2\n] + d.run(default_tag: 'test') do + d.feed(time, tests[0]) + d.feed(time, tests[1]) + end - d.run + assert_equal %[2011-01-02 13:14:15\ttest\tv1\n], d.formatted[0] + assert_equal %[2011-01-02 13:14:15\ttest\tv2\n], d.formatted[1] end def test_format_json d = create_driver("format json") time, tests = create_test_case - tests.each { |test| - d.emit(test, time) - d.expect_format Yajl.dump(test) + "\n" - } + d.run(default_tag: 'test') do + d.feed(time, tests[0]) + d.feed(time, tests[1]) + end - d.run + assert_equal Yajl.dump(tests[0]) + "\n", d.formatted[0] + assert_equal Yajl.dump(tests[1]) + "\n", d.formatted[1] end def test_format_msgpack d = create_driver("format msgpack") time, tests = create_test_case - tests.each { |test| - d.emit(test, time) - d.expect_format test.to_msgpack - } + d.run(default_tag: 'test') do + d.feed(time, tests[0]) + d.feed(time, tests[1]) + end - d.run + assert_equal tests[0].to_msgpack, d.formatted[0] + assert_equal tests[1].to_msgpack, d.formatted[1] end def test_format_time @@ -98,30 +116,33 @@ def test_format_time ] d = create_driver(config) - time = Fluent::EventTime::from_time(Time.parse("2011-01-02 13:14:15.123")) + time = event_time("2011-01-02 13:14:15.123") tests = [{"k1"=>"v1","kx"=>"vx"}, {"k1"=>"v2","kx"=>"vx"}] - tests.each { |test| - d.emit(test, time) - } - - d.expect_format %[2011-01-02 13:14:15.123\ttest\tv1\n] - d.expect_format %[2011-01-02 13:14:15.123\ttest\tv2\n] + d.run(default_tag: 'test') do + d.feed(time, tests[0]) + d.feed(time, tests[1]) + end - d.run + assert_equal %[2011-01-02 13:14:15.123\ttest\tv1\n], d.formatted[0] + assert_equal %[2011-01-02 13:14:15.123\ttest\tv2\n], d.formatted[1] end def test_write d = create_driver time, tests = create_test_case - tests.each { |test| - d.emit(test, time) - } - - d.run + d.run(default_tag: 'test', flush: true) do + d.feed(time, tests[0]) + d.feed(time, tests[1]) + end expect_path = "#{TMP_DIR}/out" + + waiting(10, plugin: d.instance) do + sleep(0.1) until File.exist?(expect_path) + end + assert_equal true, File.exist?(expect_path) data = File.read(expect_path)