Skip to content

Commit 619e6b0

Browse files
Watson1978daipom
andauthored
in_tail: fix handling of encoding parameter (#5010)
**Which issue(s) this PR fixes**: Fixes #4863 **What this PR does / why we need it**: If it configures `encoding` only (without `from_encoding`), it will convert the string encoding unexpectedly and it might break the string. This PR will fix this behavior and we can handle the string correctly. As a result, the behavior will be aligned with the documented in https://docs.fluentd.org/input/tail#encoding-from_encoding . **Docs Changes**: Not needed. **Release Note**: in_tail: fixed where specifying only `encoding` parameter might cause data corruption (affects since v0.14.12). --------- Signed-off-by: Shizuo Fujita <fujita@clear-code.com> Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com> Co-authored-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 7b4d2d5 commit 619e6b0

File tree

3 files changed

+55
-44
lines changed

3 files changed

+55
-44
lines changed

lib/fluent/plugin/in_tail.rb

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ def initialize
9696
config_param :enable_watch_timer, :bool, default: true
9797
desc 'Enable the stat watcher based on inotify.'
9898
config_param :enable_stat_watcher, :bool, default: true
99-
desc 'The encoding after conversion of the input.'
100-
config_param :encoding, :string, default: nil
10199
desc 'The encoding of the input.'
100+
config_param :encoding, :string, default: nil
101+
desc "The original encoding of the input. If set, in_tail tries to encode string from this to 'encoding'. Must be set with 'encoding'. "
102102
config_param :from_encoding, :string, default: nil
103103
desc 'Add the log path being tailed to records. Specify the field name to be used.'
104104
config_param :path_key, :string, default: nil
@@ -828,17 +828,27 @@ def statistics
828828
private
829829

830830
def io_handler(watcher, path)
831-
TailWatcher::IOHandler.new(
832-
watcher,
831+
opts = {
833832
path: path,
834833
log: log,
835834
read_lines_limit: @read_lines_limit,
836835
read_bytes_limit_per_second: @read_bytes_limit_per_second,
837836
open_on_every_update: @open_on_every_update,
838-
from_encoding: @from_encoding,
839-
encoding: @encoding,
840837
metrics: @metrics,
841838
max_line_size: @max_line_size,
839+
}
840+
unless @encoding.nil?
841+
if @from_encoding.nil?
842+
opts[:encoding] = @encoding
843+
else
844+
opts[:encoding] = @from_encoding
845+
opts[:encoding_to_convert] = @encoding
846+
end
847+
end
848+
849+
TailWatcher::IOHandler.new(
850+
watcher,
851+
**opts,
842852
&method(:receive_lines)
843853
)
844854
end
@@ -1031,46 +1041,30 @@ def swap_state(pe)
10311041
end
10321042

10331043
class FIFO
1034-
def initialize(from_encoding, encoding, log, max_line_size=nil)
1035-
@from_encoding = from_encoding
1036-
@encoding = encoding
1037-
@need_enc = from_encoding != encoding
1038-
@buffer = ''.force_encoding(from_encoding)
1039-
@eol = "\n".encode(from_encoding).freeze
1044+
def initialize(encoding, log, max_line_size=nil, encoding_to_convert=nil)
1045+
@buffer = ''.force_encoding(encoding)
1046+
@eol = "\n".encode(encoding).freeze
1047+
@encoding_to_convert = encoding_to_convert
10401048
@max_line_size = max_line_size
10411049
@skip_current_line = false
10421050
@skipping_current_line_bytesize = 0
10431051
@log = log
10441052
end
10451053

1046-
attr_reader :from_encoding, :encoding, :buffer, :max_line_size
1054+
attr_reader :buffer, :max_line_size
10471055

10481056
def <<(chunk)
1049-
# Although "chunk" is most likely transient besides String#force_encoding itself
1050-
# won't affect the actual content of it, it is also probable that "chunk" is
1051-
# a reused buffer and changing its encoding causes some problems on the caller side.
1052-
#
1053-
# Actually, the caller here is specific and "chunk" comes from IO#partial with
1054-
# the second argument, which the function always returns as a return value.
1055-
#
1056-
# Feeding a string that has its encoding attribute set to any double-byte or
1057-
# quad-byte encoding to IO#readpartial as the second arguments results in an
1058-
# assertion failure on Ruby < 2.4.0 for unknown reasons.
1059-
orig_encoding = chunk.encoding
1060-
chunk.force_encoding(from_encoding)
10611057
@buffer << chunk
1062-
# Thus the encoding needs to be reverted back here
1063-
chunk.force_encoding(orig_encoding)
10641058
end
10651059

10661060
def convert(s)
1067-
if @need_enc
1068-
s.encode!(@encoding, @from_encoding)
1061+
if @encoding_to_convert
1062+
s.encode!(@encoding_to_convert)
10691063
else
10701064
s
10711065
end
10721066
rescue
1073-
s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace)
1067+
s.encode!(@encoding_to_convert, :invalid => :replace, :undef => :replace)
10741068
end
10751069

10761070
def read_lines(lines)
@@ -1136,14 +1130,15 @@ class IOHandler
11361130

11371131
attr_accessor :shutdown_timeout
11381132

1139-
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
1133+
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, encoding: Encoding::ASCII_8BIT, encoding_to_convert: nil, metrics:, &receive_lines)
11401134
@watcher = watcher
11411135
@path = path
11421136
@read_lines_limit = read_lines_limit
11431137
@read_bytes_limit_per_second = read_bytes_limit_per_second
11441138
@receive_lines = receive_lines
11451139
@open_on_every_update = open_on_every_update
1146-
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size)
1140+
@encoding = encoding
1141+
@fifo = FIFO.new(encoding, log, max_line_size, encoding_to_convert)
11471142
@lines = []
11481143
@io = nil
11491144
@notify_mutex = Mutex.new
@@ -1225,7 +1220,7 @@ def handle_notify
12251220
end
12261221

12271222
with_io do |io|
1228-
iobuf = ''.force_encoding('ASCII-8BIT')
1223+
iobuf = ''.force_encoding(@encoding)
12291224
begin
12301225
read_more = false
12311226
has_skipped_line = false

test/plugin/in_tail/test_fifo.rb

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
class IntailFIFO < Test::Unit::TestCase
66
sub_test_case '#read_line' do
77
test 'returns lines splitting per `\n`' do
8-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
8+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
99
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
1010
fifo << text
1111
lines = []
@@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
1515
end
1616

1717
test 'concat line when line is separated' do
18-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
18+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
1919
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
2020
fifo << text
2121
lines = []
@@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
3030
end
3131

3232
test 'returns lines which convert encoding' do
33-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
33+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, nil, Encoding::UTF_8)
3434
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
3535
fifo << text
3636
lines = []
@@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
4040
end
4141

4242
test 'reads lines as from_encoding' do
43-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
43+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
4444
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
4545
fifo << text
4646
lines = []
@@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase
5151

5252
sub_test_case 'when it includes multi byte chars' do
5353
test 'handles it as ascii_8bit' do
54-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
54+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
5555
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
5656
fifo << text
5757
lines = []
@@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
6161
end
6262

6363
test 'replaces character with ? when convert error happens' do
64-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
64+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
6565
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
6666
fifo << text
6767
lines = []
@@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
7272
end
7373

7474
test 'returns nothing when buffer is empty' do
75-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
75+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
7676
lines = []
7777
fifo.read_lines(lines)
7878
assert_equal [], lines
@@ -117,7 +117,7 @@ class IntailFIFO < Test::Unit::TestCase
117117
])
118118
test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)|
119119
max_line_size = 5
120-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
120+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
121121
lines = []
122122

123123
input_texts.each do |text|
@@ -133,7 +133,7 @@ class IntailFIFO < Test::Unit::TestCase
133133

134134
sub_test_case '#<<' do
135135
test 'does not make any change about encoding to an argument' do
136-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
136+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
137137
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
138138

139139
assert_equal Encoding::UTF_8, text.encoding
@@ -144,7 +144,7 @@ class IntailFIFO < Test::Unit::TestCase
144144

145145
sub_test_case '#reading_bytesize' do
146146
test 'returns buffer size' do
147-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
147+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
148148
text = "test\n" * 3 + 'test'
149149
fifo << text
150150

@@ -163,7 +163,7 @@ class IntailFIFO < Test::Unit::TestCase
163163

164164
test 'returns the entire line size even if the size is over max_line_size' do
165165
max_line_size = 20
166-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
166+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
167167
lines = []
168168

169169
text = "long line still not having EOL"

test/plugin/test_in_tail.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,22 @@ def test_encoding_with_bad_character
12231223
assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding)
12241224
end
12251225

1226+
def test_encoding_for_regular_expression_parsing
1227+
conf = CONFIG_READ_FROM_HEAD +
1228+
config_element("", "" , { "encoding" => "utf-8" },
1229+
[config_element("parse", "", { "@type" => "/^あ(?<name>.*)お$/" })])
1230+
1231+
d = create_driver(conf)
1232+
d.run(expect_emits: 1, timeout: 5) do
1233+
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f|
1234+
f.puts "あいうえお"
1235+
}
1236+
end
1237+
events = d.events
1238+
assert_equal(true, events.length > 0)
1239+
assert_equal({"name" => "いうえ"}, events[0][2])
1240+
end
1241+
12261242
sub_test_case "multiline" do
12271243
data(flat: MULTILINE_CONFIG,
12281244
parse: PARSE_MULTILINE_CONFIG)

0 commit comments

Comments
 (0)