Skip to content

Commit

Permalink
Handle bytes limits more elastically
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed May 11, 2021
1 parent 27bb805 commit 34fcda1
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@io = nil
@notify_mutex = Mutex.new
@log = log
@start_reading = 0
@last_read_time = 0
@number_bytes_read = 0

Expand All @@ -931,19 +932,19 @@ def opened?

private

def read_bytes_limits_reached?(number_bytes_read)
number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
def read_bytes_limits_reached?
@number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
end

def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
return false unless read_bytes_limits_reached?(number_bytes_read)
def limit_bytes_per_second_reached?
return false unless read_bytes_limits_reached?

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
time_spent_reading = Fluent::Clock.now - @start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
estimated_waiting_ingestion_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again after #{estimated_waiting_ingestion_time} second(s) or later.")
return true
end

Expand All @@ -953,15 +954,18 @@ def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
def refresh_bytes_read_counter
if Fluent::Clock.now - @last_read_time > 1
@number_bytes_read = 0
@start_reading = Fluent::Clock.now
end

yield
end

def handle_notify
with_io do |io|
return if limit_bytes_per_second_reached?

begin
start_reading = Fluent::Clock.now
@start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
Expand All @@ -980,7 +984,7 @@ def handle_notify
read_more = true
break
end
if limit_bytes_per_second_reached?(start_reading, @number_bytes_read)
if limit_bytes_per_second_reached?
# Just get out from tailing loop.
read_more = false
break
Expand Down

0 comments on commit 34fcda1

Please sign in to comment.