Skip to content

Commit

Permalink
Make stage_size & stage computation thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Nelakurthi <nlh125@gmail.com>
  • Loading branch information
Harish Nelakurthi committed Dec 12, 2019
1 parent 029bb4b commit 4383919
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size }

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}

begin
# sort metadata to get lock of chunks in same order with other threads
Expand All @@ -283,7 +283,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
staged_bytesize += adding_bytesize
staged_bytesizes_by_chunk[chunk] = 0 if staged_bytesizes_by_chunk[chunk].nil?
staged_bytesizes_by_chunk[chunk] += adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
Expand Down Expand Up @@ -330,27 +331,39 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

# All locks about chunks are released.

synchronize do
# At here, staged chunks may be enqueued by other threads.
@stage_size += staged_bytesize

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
#
# Now update the stage, stage_size with proper locking
# FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
if chunk.staged?
synchronize { @stage_size += bytesize }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end
end

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
u.synchronize do
if u.unstaged? && !chunk_size_full?(u)
@stage[m] = u.staged!
@stage_size += u.bytesize
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
}
end
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
end

Expand Down

0 comments on commit 4383919

Please sign in to comment.