Skip to content

Commit

Permalink
in_tail: Simplify checking read_bytes_limit_per_second
Browse files Browse the repository at this point in the history
Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed May 11, 2021
1 parent 34fcda1 commit 4f921a3
Showing 1 changed file with 14 additions and 29 deletions.
43 changes: 14 additions & 29 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,8 +908,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@io = nil
@notify_mutex = Mutex.new
@log = log
@start_reading = 0
@last_read_time = 0
@start_reading_time = nil
@number_bytes_read = 0

@log.info "following tail of #{@path}"
Expand All @@ -932,49 +931,36 @@ def opened?

private

def read_bytes_limits_reached?
@number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
end

def limit_bytes_per_second_reached?
return false unless read_bytes_limits_reached?
return false if @read_bytes_limit_per_second < 0 # not enabled by conf
return false if @number_bytes_read < @read_bytes_limit_per_second

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - @start_reading
@start_reading_time ||= Fluent::Clock.now
time_spent_reading = Fluent::Clock.now - @start_reading_time
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
estimated_waiting_ingestion_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again after #{estimated_waiting_ingestion_time} second(s) or later.")
return true
end

false
end

def refresh_bytes_read_counter
if Fluent::Clock.now - @last_read_time > 1
if time_spent_reading < 1
true
else
@start_reading_time = nil
@number_bytes_read = 0
@start_reading = Fluent::Clock.now
false
end

yield
end

def handle_notify
with_io do |io|
return if limit_bytes_per_second_reached?
return if limit_bytes_per_second_reached?

with_io do |io|
begin
@start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@start_reading_time ||= Fluent::Clock.now
data = io.readpartial(BYTES_TO_READ, @iobuf)
refresh_bytes_read_counter do
@number_bytes_read += data.bytesize
end
@number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)

Expand All @@ -989,7 +975,6 @@ def handle_notify
read_more = false
break
end
@last_read_time = Fluent::Clock.now
end
rescue EOFError
end
Expand Down

0 comments on commit 4f921a3

Please sign in to comment.