From 6f98c3034e247ad987aa312f1300099957f59ad6 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 3 Mar 2020 15:16:28 +0900 Subject: [PATCH] metadata.seq should be 0 when counting `@queued_num`, the difference of seq's value cause the bug at Buffer#optimistic_queued? https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L782 Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buf_file.rb | 1 + lib/fluent/plugin/buffer.rb | 2 ++ test/plugin/test_buffer.rb | 12 +++--------- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 40574a2d48..84a73921ca 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -170,6 +170,7 @@ def resume # these chunks(unstaged chunks) has shared the same metadata # So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364 if chunk_size_full?(chunk) || stage.key?(chunk.metadata) + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num queue << chunk.enqueued! else stage[chunk.metadata] = chunk diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 57eba6e2b8..3e8f95edda 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -428,6 +428,7 @@ def enqueue_chunk(metadata) if chunk.empty? chunk.close else + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! @@ -446,6 +447,7 @@ def enqueue_unstaged_chunk(chunk) synchronize do chunk.synchronize do metadata = chunk.metadata + metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 17528daa3b..0b995dd6e8 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -903,9 +903,7 @@ def create_chunk_es(metadata, es) # metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357 assert_equal [@dm0], @p.stage.keys assert_equal 5400, @p.stage[@dm0].size - r = [@dm0] - 3.times { |i| r << r[i].dup_next } - assert_equal [@dm0, *r], @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... # 9900 * 4 + 5400 == 45000 end @@ -922,9 +920,7 @@ def create_chunk_es(metadata, es) dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ... assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size) - r = [@dm0] - 3.times { |i| r << r[i].dup_next } - assert_equal [@dm0, *r, @dm0], dequeued_chunks.map(&:metadata) # last last one's metadata.seq is 0 + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata) end test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do @@ -1010,9 +1006,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0], @p.stage.keys assert_equal 900, @p.stage[@dm0].size - r = [@dm0] - 4.times { |i| r << r[i].dup_next } - assert_equal r, @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... ##### 900 + 9500 + 9900 * 4 == 5000 + 45000 end