Skip to content

Commit

Permalink
port out_exec to v0.14 API (example of time sliced output)
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed May 24, 2016
1 parent f3acf4c commit ba2586d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 42 deletions.
24 changes: 14 additions & 10 deletions lib/fluent/plugin/out_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@

require 'tempfile'

require 'fluent/output'
require 'fluent/plugin/output'
require 'fluent/config/error'
require 'fluent/plugin/exec_util'
require 'fluent/mixin' # for TimeFormatter

module Fluent
class ExecOutput < TimeSlicedOutput
Plugin.register_output('exec', self)
module Fluent::Plugin
class ExecOutput < Output
Fluent::Plugin.register_output('exec', self)

def initialize
super
@localtime = false
end
helpers :compat_parameters

desc 'The command (program) to execute. The exec plugin passes the path of a TSV file as the last argumen'
config_param :command, :string
Expand All @@ -47,14 +45,20 @@ def initialize
raise ConfigError, "Unsupported format '#{val}'" unless f
f
end
config_param :localtime, :bool, default: false
config_param :timezone, :string, default: nil

def compat_parameters_default_chunk_key
'time'
end

def configure(conf)
super

@formatter = case @format
when :tsv
if @keys.empty?
raise ConfigError, "keys option is required on exec output for tsv format"
raise Fluent::ConfigError, "keys option is required on exec output for tsv format"
end
ExecUtil::TSVFormatter.new(@keys)
when :json
Expand All @@ -65,7 +69,7 @@ def configure(conf)

if @time_key
if @time_format
tf = TimeFormatter.new(@time_format, @localtime, @timezone)
tf = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
@time_format_proc = tf.method(:format)
else
@time_format_proc = Proc.new { |time| time.to_s }
Expand Down
85 changes: 53 additions & 32 deletions test/plugin/test_out_exec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require_relative '../helper'
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_exec'
require 'fileutils'

Expand Down Expand Up @@ -32,7 +32,7 @@ def setup

def create_driver(conf = TSV_CONFIG)
config = CONFIG + conf
Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::ExecOutput).configure(config, true)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ExecOutput).configure(config)
end

def create_test_case
Expand All @@ -51,42 +51,60 @@ def test_configure
assert_equal true, d.instance.localtime
end

def test_configure_with_compat_buffer_parameters
conf = TSV_CONFIG + %[
buffer_type memory
time_slice_format %Y%m%d%H
num_threads 5
buffer_chunk_limit 50m
buffer_queue_limit 128
flush_at_shutdown yes
]
d = create_driver(conf)
assert_equal 3600, d.instance.buffer_config.timekey_range
assert_equal 5, d.instance.buffer_config.flush_threads
assert_equal 50*1024*1024, d.instance.buffer.chunk_bytes_limit
assert_equal 128, d.instance.buffer.queue_length_limit
assert d.instance.buffer_config.flush_at_shutdown
end

def test_format
d = create_driver
time, tests = create_test_case

tests.each { |test|
d.emit(test, time)
}

d.expect_format %[2011-01-02 13:14:15\ttest\tv1\n]
d.expect_format %[2011-01-02 13:14:15\ttest\tv2\n]
d.run(default_tag: 'test') do
d.feed(time, tests[0])
d.feed(time, tests[1])
end

d.run
assert_equal %[2011-01-02 13:14:15\ttest\tv1\n], d.formatted[0]
assert_equal %[2011-01-02 13:14:15\ttest\tv2\n], d.formatted[1]
end

def test_format_json
d = create_driver("format json")
time, tests = create_test_case

tests.each { |test|
d.emit(test, time)
d.expect_format Yajl.dump(test) + "\n"
}
d.run(default_tag: 'test') do
d.feed(time, tests[0])
d.feed(time, tests[1])
end

d.run
assert_equal Yajl.dump(tests[0]) + "\n", d.formatted[0]
assert_equal Yajl.dump(tests[1]) + "\n", d.formatted[1]
end

def test_format_msgpack
d = create_driver("format msgpack")
time, tests = create_test_case

tests.each { |test|
d.emit(test, time)
d.expect_format test.to_msgpack
}
d.run(default_tag: 'test') do
d.feed(time, tests[0])
d.feed(time, tests[1])
end

d.run
assert_equal tests[0].to_msgpack, d.formatted[0]
assert_equal tests[1].to_msgpack, d.formatted[1]
end

def test_format_time
Expand All @@ -98,30 +116,33 @@ def test_format_time
]
d = create_driver(config)

time = Fluent::EventTime::from_time(Time.parse("2011-01-02 13:14:15.123"))
time = event_time("2011-01-02 13:14:15.123")
tests = [{"k1"=>"v1","kx"=>"vx"}, {"k1"=>"v2","kx"=>"vx"}]

tests.each { |test|
d.emit(test, time)
}

d.expect_format %[2011-01-02 13:14:15.123\ttest\tv1\n]
d.expect_format %[2011-01-02 13:14:15.123\ttest\tv2\n]
d.run(default_tag: 'test') do
d.feed(time, tests[0])
d.feed(time, tests[1])
end

d.run
assert_equal %[2011-01-02 13:14:15.123\ttest\tv1\n], d.formatted[0]
assert_equal %[2011-01-02 13:14:15.123\ttest\tv2\n], d.formatted[1]
end

def test_write
d = create_driver
time, tests = create_test_case

tests.each { |test|
d.emit(test, time)
}

d.run
d.run(default_tag: 'test', flush: true) do
d.feed(time, tests[0])
d.feed(time, tests[1])
end

expect_path = "#{TMP_DIR}/out"

waiting(10, plugin: d.instance) do
sleep(0.1) until File.exist?(expect_path)
end

assert_equal true, File.exist?(expect_path)

data = File.read(expect_path)
Expand Down

0 comments on commit ba2586d

Please sign in to comment.