Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluent-cat: support secondary [Fluent::EventTime, {}] record too #3368

Merged
merged 1 commit into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,30 @@ def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
super()
end

def secondary_record?(record)
record.class != Hash &&
record.size == 2 &&
record.first.class == Fluent::EventTime &&
record.last.class == Hash
end

def write(record)
if record.class != Hash
raise ArgumentError, "Input must be a map (got #{record.class})"
unless secondary_record?(record)
if record.class != Hash
raise ArgumentError, "Input must be a map (got #{record.class})"
end
end

time = Fluent::EventTime.now
time = time.to_i if @time_as_integer
entry = [time, record]
entry = if secondary_record?(record)
# Even though secondary contains Fluent::EventTime in record,
# fluent-cat just ignore it and set Fluent::EventTime.now instead.
# This specification is adopted to keep consistency.
[time, record.last]
else
[time, record]
end
synchronize {
unless write_impl([entry])
# write failed
Expand Down
96 changes: 96 additions & 0 deletions test/command/test_cat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
require_relative '../helper'

require 'test-unit'
require 'open3'
require 'fluent/plugin/output'
require 'fluent/plugin/in_forward'
require 'fluent/plugin/out_secondary_file'
require 'fluent/test/driver/output'
require 'fluent/test/driver/input'

class TestFluentCat < ::Test::Unit::TestCase
def setup
Fluent::Test.setup
FileUtils.mkdir_p(TMP_DIR)
@record = { 'key' => 'value' }
@time = event_time
@es = Fluent::OneEventStream.new(@time, @record)
@primary = create_primary
metadata = @primary.buffer.new_metadata
@chunk = create_chunk(@primary, metadata, @es)
end

def teardown
FileUtils.rm_rf(TMP_DIR)
end

TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluent_cat#{ENV['TEST_ENV_NUMBER']}")
FLUENT_CAT_COMMAND = File.expand_path(File.dirname(__FILE__) + "/../../bin/fluent-cat")

PORT = unused_port
CONFIG = %[
port #{PORT}
bind 127.0.0.1
]

SECONDARY_CONFIG = %[
directory #{TMP_DIR}
]

class DummyOutput < Fluent::Plugin::Output
def write(chunk); end
end

def create_driver(conf=CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf)
end

def create_primary(buffer_cofig = config_element('buffer'))
DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig]))
end

def create_secondary_driver(conf=SECONDARY_CONFIG)
c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
c.instance.acts_as_secondary(@primary)
c.configure(conf)
end

def create_chunk(primary, metadata, es)
primary.buffer.generate_chunk(metadata).tap do |c|
c.concat(es.to_msgpack_stream, es.size)
c.commit
end
end

sub_test_case "json" do
def test_cat_json
d = create_driver
d.run(expect_records: 1) do
Open3.pipeline_w("ruby #{FLUENT_CAT_COMMAND} --port #{PORT} json") do |stdin|
stdin.puts('{"key":"value"}')
stdin.close
end
end
event = d.events.first
assert_equal([1, "json", @record],
[d.events.size, event.first, event.last])
end
end

sub_test_case "msgpack" do
def test_cat_secondary_file
d = create_secondary_driver
path = d.instance.write(@chunk)
d = create_driver
d.run(expect_records: 1) do
Open3.pipeline_w("ruby #{FLUENT_CAT_COMMAND} --port #{PORT} --format msgpack secondary") do |stdin|
stdin.write(File.read(path))
stdin.close
end
end
event = d.events.first
assert_equal([1, "secondary", @record],
[d.events.size, event.first, event.last])
end
end
end