Skip to content

Commit

Permalink
metadata.seq should be 0
Browse files Browse the repository at this point in the history
when counting `@queued_num`, the difference of seq's value cause the
bug at Buffer#optimistic_queued?
https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L782

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Mar 3, 2020
1 parent 77654e9 commit 6f98c30
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 9 deletions.
1 change: 1 addition & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def resume
# these chunks(unstaged chunks) has shared the same metadata
# So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
queue << chunk.enqueued!
else
stage[chunk.metadata] = chunk
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def enqueue_chunk(metadata)
if chunk.empty?
chunk.close
else
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand All @@ -446,6 +447,7 @@ def enqueue_unstaged_chunk(chunk)
synchronize do
chunk.synchronize do
metadata = chunk.metadata
metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand Down
12 changes: 3 additions & 9 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -903,9 +903,7 @@ def create_chunk_es(metadata, es)
# metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357
assert_equal [@dm0], @p.stage.keys
assert_equal 5400, @p.stage[@dm0].size
r = [@dm0]
3.times { |i| r << r[i].dup_next }
assert_equal [@dm0, *r], @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
# 9900 * 4 + 5400 == 45000
end
Expand All @@ -922,9 +920,7 @@ def create_chunk_es(metadata, es)

dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ...
assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size)
r = [@dm0]
3.times { |i| r << r[i].dup_next }
assert_equal [@dm0, *r, @dm0], dequeued_chunks.map(&:metadata) # last last one's metadata.seq is 0
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata)
end

test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do
Expand Down Expand Up @@ -1010,9 +1006,7 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0], @p.stage.keys
assert_equal 900, @p.stage[@dm0].size
r = [@dm0]
4.times { |i| r << r[i].dup_next }
assert_equal r, @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
##### 900 + 9500 + 9900 * 4 == 5000 + 45000
end
Expand Down

0 comments on commit 6f98c30

Please sign in to comment.