diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fb29dd249f..ef8246f7e8 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -484,8 +484,8 @@ def close_watcher_handles end # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. - def update_watcher(target_info, pe) - path = target_info.path + def update_watcher(tail_watcher, pe, new_inode) + path = tail_watcher.path log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") @@ -499,23 +499,26 @@ def update_watcher(target_info, pe) end end - rotated_tw = @tails[path] + new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes - new_position_entry = @pf[target_info] + # When follow_inodes is true, it's not cleaned up by refresh_watcher. + # So it should be unwatched here explicitly. + tail_watcher.unwatched = true + new_position_entry = @pf[new_target_info] + # If `refresh_watcher` find the new file before, this will not be zero. + # In this case, only we have to do is detaching the current tail_watcher. if new_position_entry.read_inode == 0 - # When follow_inodes is true, it's not cleaned up by refresh_watcher. - # So it should be unwatched here explicitly. - rotated_tw.unwatched = true if rotated_tw - @tails[path] = setup_watcher(target_info, new_position_entry) + @tails[path] = setup_watcher(new_target_info, new_position_entry) @tails[path].on_notify end else - @tails[path] = setup_watcher(target_info, pe) + @tails[path] = setup_watcher(new_target_info, pe) @tails[path].on_notify end - detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw + + detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) end # TailWatcher#close is called by another thread at shutdown phase. @@ -877,18 +880,14 @@ def on_rotate(stat) # No need to update a watcher if stat is nil (file not present), because moving to inodes will create # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method # don't want to swap state because we need latest read offset in pos file even after rotate_wait - if stat - target_info = TargetInfo.new(@path, stat.ino) - @update_watcher.call(target_info, @pe) - end + @update_watcher.call(self, @pe, stat.ino) if stat else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. - target_info = TargetInfo.new(@path, stat ? stat.ino : nil) - @update_watcher.call(target_info, swap_state(@pe)) + @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b1e28fc986..81bf0dfa7d 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2638,4 +2638,279 @@ def test_lines_collected_with_no_throttling(data) end end end + + sub_test_case "Update watchers for rotation with follow_inodes" do + def test_updateTW_before_refreshTW_and_detach_before_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + # TODO: This is BUG!! We need to fix it and replace this with the next. + record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], + # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + # TODO: This is BUG!! We need to fix it and replace this with the next. + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], + ], + # position_entries: [ + # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + # ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + def test_updateTW_before_refreshTW_and_detach_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher after refresh_watchers. + "rotate_wait" => "4s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + sleep 2 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # The old TailWathcer is detached here since `rotate_wait` is `4s`. + sleep 3 + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + # The scenario where in_tail wrongly detaches TailWatcher. + # This is reported in https://github.com/fluent/fluentd/issues/4190. + def test_updateTW_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # This reproduces the following situation: + # Rotation => refresh_watchers => update_watcher + # This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1) + # This overwrites `@tails["tail.txt"]` + d.instance.refresh_watchers + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + # TODO: This is BUG!! We need to fix it and replace this with the next. + record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], + # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + # TODO: This is BUG!! We need to fix it and replace this with the next. + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], + ], + # position_entries: [ + # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + # ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + end end