From 0e9ac447a29fa225ebe574dde25b5e7b5f5c29a1 Mon Sep 17 00:00:00 2001 From: Kentaro Hayashi Date: Fri, 14 May 2021 18:50:32 +0900 Subject: [PATCH] fluent-cat: support secondary file https://docs.fluentd.org/output/secondary_file#how-to-resend-secondary-file explains that it supports secondary file to resend, but it get error when [Fluent::EventTime, Hash] format is given. Input must be a map (got Array) In this commit, check record content and support [Fluent::EventTime, Hash] format too. Note that fluent-cat ignore timestamp which is stored in secondary records. (it is intended behavior for keeping consistency) Signed-off-by: Kentaro Hayashi --- lib/fluent/command/cat.rb | 22 +++++++-- test/command/test_cat.rb | 96 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 test/command/test_cat.rb diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index c903c6e5b1..79b23bc6da 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -152,14 +152,30 @@ def initialize(tag, connector, time_as_integer: false, retry_limit: 5) super() end + def secondary_record?(record) + record.class != Hash && + record.size == 2 && + record.first.class == Fluent::EventTime && + record.last.class == Hash + end + def write(record) - if record.class != Hash - raise ArgumentError, "Input must be a map (got #{record.class})" + unless secondary_record?(record) + if record.class != Hash + raise ArgumentError, "Input must be a map (got #{record.class})" + end end time = Fluent::EventTime.now time = time.to_i if @time_as_integer - entry = [time, record] + entry = if secondary_record?(record) + # Even though secondary contains Fluent::EventTime in record, + # fluent-cat just ignore it and set Fluent::EventTime.now instead. + # This specification is adopted to keep consistency. + [time, record.last] + else + [time, record] + end synchronize { unless write_impl([entry]) # write failed diff --git a/test/command/test_cat.rb b/test/command/test_cat.rb new file mode 100644 index 0000000000..33c9b7faf5 --- /dev/null +++ b/test/command/test_cat.rb @@ -0,0 +1,96 @@ +require_relative '../helper' + +require 'test-unit' +require 'open3' +require 'fluent/plugin/output' +require 'fluent/plugin/in_forward' +require 'fluent/plugin/out_secondary_file' +require 'fluent/test/driver/output' +require 'fluent/test/driver/input' + +class TestFluentCat < ::Test::Unit::TestCase + def setup + Fluent::Test.setup + FileUtils.mkdir_p(TMP_DIR) + @record = { 'key' => 'value' } + @time = event_time + @es = Fluent::OneEventStream.new(@time, @record) + @primary = create_primary + metadata = @primary.buffer.new_metadata + @chunk = create_chunk(@primary, metadata, @es) + end + + def teardown + FileUtils.rm_rf(TMP_DIR) + end + + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluent_cat#{ENV['TEST_ENV_NUMBER']}") + FLUENT_CAT_COMMAND = File.expand_path(File.dirname(__FILE__) + "/../../bin/fluent-cat") + + PORT = unused_port + CONFIG = %[ + port #{PORT} + bind 127.0.0.1 + ] + + SECONDARY_CONFIG = %[ + directory #{TMP_DIR} + ] + + class DummyOutput < Fluent::Plugin::Output + def write(chunk); end + end + + def create_driver(conf=CONFIG) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf) + end + + def create_primary(buffer_cofig = config_element('buffer')) + DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig])) + end + + def create_secondary_driver(conf=SECONDARY_CONFIG) + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) + c.instance.acts_as_secondary(@primary) + c.configure(conf) + end + + def create_chunk(primary, metadata, es) + primary.buffer.generate_chunk(metadata).tap do |c| + c.concat(es.to_msgpack_stream, es.size) + c.commit + end + end + + sub_test_case "json" do + def test_cat_json + d = create_driver + d.run(expect_records: 1) do + Open3.pipeline_w("ruby #{FLUENT_CAT_COMMAND} --port #{PORT} json") do |stdin| + stdin.puts('{"key":"value"}') + stdin.close + end + end + event = d.events.first + assert_equal([1, "json", @record], + [d.events.size, event.first, event.last]) + end + end + + sub_test_case "msgpack" do + def test_cat_secondary_file + d = create_secondary_driver + path = d.instance.write(@chunk) + d = create_driver + d.run(expect_records: 1) do + Open3.pipeline_w("ruby #{FLUENT_CAT_COMMAND} --port #{PORT} --format msgpack secondary") do |stdin| + stdin.write(File.read(path)) + stdin.close + end + end + event = d.events.first + assert_equal([1, "secondary", @record], + [d.events.size, event.first, event.last]) + end + end +end