Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def initialize
config_param :enable_watch_timer, :bool, default: true
desc 'Enable the stat watcher based on inotify.'
config_param :enable_stat_watcher, :bool, default: true
desc 'The encoding after conversion of the input.'
config_param :encoding, :string, default: nil
desc 'The encoding of the input.'
config_param :encoding, :string, default: nil
desc "The original encoding of the input. If set, in_tail tries to encode string from this to 'encoding'. Must be set with 'encoding'. "
config_param :from_encoding, :string, default: nil
desc 'Add the log path being tailed to records. Specify the field name to be used.'
config_param :path_key, :string, default: nil
Expand Down Expand Up @@ -828,17 +828,27 @@ def statistics
private

def io_handler(watcher, path)
TailWatcher::IOHandler.new(
watcher,
opts = {
path: path,
log: log,
read_lines_limit: @read_lines_limit,
read_bytes_limit_per_second: @read_bytes_limit_per_second,
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
metrics: @metrics,
max_line_size: @max_line_size,
}
unless @encoding.nil?
if @from_encoding.nil?
opts[:encoding] = @encoding
else
opts[:encoding] = @from_encoding
opts[:encoding_to_convert] = @encoding
end
end

TailWatcher::IOHandler.new(
watcher,
**opts,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -1031,46 +1041,30 @@ def swap_state(pe)
end

class FIFO
def initialize(from_encoding, encoding, log, max_line_size=nil)
@from_encoding = from_encoding
@encoding = encoding
@need_enc = from_encoding != encoding
@buffer = ''.force_encoding(from_encoding)
@eol = "\n".encode(from_encoding).freeze
def initialize(encoding, log, max_line_size=nil, encoding_to_convert=nil)
@buffer = ''.force_encoding(encoding)
@eol = "\n".encode(encoding).freeze
@encoding_to_convert = encoding_to_convert
@max_line_size = max_line_size
@skip_current_line = false
@skipping_current_line_bytesize = 0
@log = log
end

attr_reader :from_encoding, :encoding, :buffer, :max_line_size
attr_reader :buffer, :max_line_size

def <<(chunk)
# Although "chunk" is most likely transient besides String#force_encoding itself
# won't affect the actual content of it, it is also probable that "chunk" is
# a reused buffer and changing its encoding causes some problems on the caller side.
#
# Actually, the caller here is specific and "chunk" comes from IO#partial with
# the second argument, which the function always returns as a return value.
#
# Feeding a string that has its encoding attribute set to any double-byte or
# quad-byte encoding to IO#readpartial as the second arguments results in an
# assertion failure on Ruby < 2.4.0 for unknown reasons.
orig_encoding = chunk.encoding
chunk.force_encoding(from_encoding)
@buffer << chunk
# Thus the encoding needs to be reverted back here
chunk.force_encoding(orig_encoding)
end

def convert(s)
if @need_enc
s.encode!(@encoding, @from_encoding)
if @encoding_to_convert
s.encode!(@encoding_to_convert)
else
s
end
rescue
s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace)
s.encode!(@encoding_to_convert, :invalid => :replace, :undef => :replace)
end

def read_lines(lines)
Expand Down Expand Up @@ -1136,14 +1130,15 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, encoding: Encoding::ASCII_8BIT, encoding_to_convert: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size)
@encoding = encoding
@fifo = FIFO.new(encoding, log, max_line_size, encoding_to_convert)
@lines = []
@io = nil
@notify_mutex = Mutex.new
Expand Down Expand Up @@ -1225,7 +1220,7 @@ def handle_notify
end

with_io do |io|
iobuf = ''.force_encoding('ASCII-8BIT')
iobuf = ''.force_encoding(@encoding)
begin
read_more = false
has_skipped_line = false
Expand Down
22 changes: 11 additions & 11 deletions test/plugin/in_tail/test_fifo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class IntailFIFO < Test::Unit::TestCase
sub_test_case '#read_line' do
test 'returns lines splitting per `\n`' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'concat line when line is separated' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'returns lines which convert encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, nil, Encoding::UTF_8)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reads lines as from_encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case 'when it includes multi byte chars' do
test 'handles it as ascii_8bit' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'replaces character with ? when convert error happens' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'returns nothing when buffer is empty' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
lines = []
fifo.read_lines(lines)
assert_equal [], lines
Expand Down Expand Up @@ -117,7 +117,7 @@ class IntailFIFO < Test::Unit::TestCase
])
test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)|
max_line_size = 5
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

input_texts.each do |text|
Expand All @@ -133,7 +133,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case '#<<' do
test 'does not make any change about encoding to an argument' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)

assert_equal Encoding::UTF_8, text.encoding
Expand All @@ -144,7 +144,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case '#reading_bytesize' do
test 'returns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
text = "test\n" * 3 + 'test'
fifo << text

Expand All @@ -163,7 +163,7 @@ class IntailFIFO < Test::Unit::TestCase

test 'returns the entire line size even if the size is over max_line_size' do
max_line_size = 20
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

text = "long line still not having EOL"
Expand Down
16 changes: 16 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,22 @@ def test_encoding_with_bad_character
assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding)
end

def test_encoding_for_regular_expression_parsing
conf = CONFIG_READ_FROM_HEAD +
config_element("", "" , { "encoding" => "utf-8" },
[config_element("parse", "", { "@type" => "/^あ(?<name>.*)お$/" })])

d = create_driver(conf)
d.run(expect_emits: 1, timeout: 5) do
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f|
f.puts "あいうえお"
}
end
events = d.events
assert_equal(true, events.length > 0)
assert_equal({"name" => "いうえ"}, events[0][2])
end

sub_test_case "multiline" do
data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
Expand Down
Loading