Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add seq to metadata that it can be unique #2824

Merged
merged 4 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,16 @@ def resume

case chunk.state
when :staged
stage[chunk.metadata] = chunk
# unstaged chunk created at Buffer#write_step_by_step is identified as the staged chunk here because FileChunk#assume_chunk_state checks only the file name.
# https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L663
# This case can happen when fluentd process is killed by signal or other reasons between creating unstaged chunks and changing them to staged mode in Buffer#write
# these chunks(unstaged chunks) has shared the same metadata
# So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
queue << chunk.enqueued!
else
stage[chunk.metadata] = chunk
end
when :queued
queue << chunk
end
Expand Down
18 changes: 15 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,17 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Metadata = Struct.new(:timekey, :tag, :variables) do
Metadata = Struct.new(:timekey, :tag, :variables, :seq) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use seq in cmp_variables and <=>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. seq is not related to an order. it's just used for the uniqueness of metadata.

def initialize(timekey, tag, variables)
super(timekey, tag, variables, 0)
end

def dup_next
m = dup
m.seq = seq + 1
m
end

def empty?
timekey.nil? && tag.nil? && variables.nil?
end
Expand Down Expand Up @@ -656,13 +666,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
# Then, will generate chunks not staged (not queued) to append rest data.
staged_chunk_used = false
modified_chunks = []
modified_metadata = metadata
get_next_chunk = ->(){
c = if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
generate_chunk(metadata)
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize{ @stage[metadata] ||= generate_chunk(metadata).staged! }
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
modified_chunks << c
c
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def restore_metadata(bindata)
@metadata.timekey = data[:timekey]
@metadata.tag = data[:tag]
@metadata.variables = data[:variables]
@metadata.seq = data[:seq] || 0
end

def restore_metadata_partially(chunk)
Expand All @@ -243,6 +244,7 @@ def restore_metadata_partially(chunk)
@metadata.timekey = nil
@metadata.tag = nil
@metadata.variables = nil
@metadata.seq = 0
end

def write_metadata(update: true)
Expand Down
84 changes: 82 additions & 2 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ def write(chunk)
end

class FileBufferTest < Test::Unit::TestCase
def metadata(timekey: nil, tag: nil, variables: nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
def metadata(timekey: nil, tag: nil, variables: nil, seq: 0)
m = Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
m.seq = seq
m
end

def write_metadata_old(path, chunk_id, metadata, size, ctime, mtime)
Expand All @@ -43,6 +45,7 @@ def write_metadata_old(path, chunk_id, metadata, size, ctime, mtime)
def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
metadata = {
timekey: metadata.timekey, tag: metadata.tag, variables: metadata.variables,
seq: metadata.seq,
id: chunk_id,
s: size,
c: ctime,
Expand Down Expand Up @@ -996,6 +999,83 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
end
end

sub_test_case 'there are the same timekey metadata in stage' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
@bufpath = File.join(@bufdir, 'testbuf.*.log')
FileUtils.rm_r(@bufdir) if File.exist?(@bufdir)
FileUtils.mkdir_p(@bufdir)

m = metadata(timekey: event_time('2016-04-17 13:58:00 -0700').to_i)

c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay1"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay2"}].to_json + "\n"
end
write_metadata(p1 + '.meta', c1id, m, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

c2id = Fluent::UniqueId.generate
p2 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c2id)}.log")
File.open(p2, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay3"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay4"}].to_json + "\n"
end
m2 = m.dup_next
write_metadata(p2 + '.meta', c2id, m2, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

c3id = Fluent::UniqueId.generate
p3 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c3id)}.log")
File.open(p3, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay5"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
m3 = m2.dup_next
write_metadata(p3 + '.meta', c3id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay5"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
write_metadata(p4 + '.meta', c4id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end
teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end

if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

test '#resume returns each chunks' do
s, e = @p.resume
assert_equal 3, s.size
assert_equal [0, 1, 2], s.keys.map(&:seq).sort
assert_equal 1, e.size
assert_equal [2], e.map { |e| e.metadata.seq }
end
end

sub_test_case 'there are some non-buffer chunk files, with a path without buffer chunk ids' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)
Expand Down
9 changes: 7 additions & 2 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,12 @@ def create_chunk_es(metadata, es)
es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * (128 - 22)}] ] * 45000)
@p.write({@dm0 => es}, format: @format)

# metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357
assert_equal [@dm0], @p.stage.keys
assert_equal 5400, @p.stage[@dm0].size
assert_equal [@dm0,@dm0,@dm0,@dm0,@dm0], @p.queue.map(&:metadata)
r = [@dm0]
3.times { |i| r << r[i].dup_next }
assert_equal [@dm0, *r], @p.queue.map(&:metadata)
assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
# 9900 * 4 + 5400 == 45000
end
Expand Down Expand Up @@ -990,7 +993,9 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0], @p.stage.keys
assert_equal 900, @p.stage[@dm0].size
assert_equal [@dm0,@dm0,@dm0,@dm0,@dm0], @p.queue.map(&:metadata)
r = [@dm0]
4.times { |i| r << r[i].dup_next }
assert_equal r, @p.queue.map(&:metadata)
assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
##### 900 + 9500 + 9900 * 4 == 5000 + 45000
end
Expand Down
21 changes: 11 additions & 10 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class BufferFileChunkTest < Test::Unit::TestCase
Timecop.return
end

Metadata = Struct.new(:timekey, :tag, :variables)
Metadata = Fluent::Plugin::Buffer::Metadata

def gen_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end
Expand Down Expand Up @@ -360,7 +361,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal content, File.open(@c.path, 'rb'){|f| f.read }

stored_meta = {
timekey: nil, tag: nil, variables: nil,
timekey: nil, tag: nil, variables: nil, seq: 0,
id: unique_id,
s: size,
c: created_at.to_i,
Expand Down Expand Up @@ -424,7 +425,7 @@ def gen_chunk_path(prefix, unique_id)
@c.commit

expected = {
timekey: nil, tag: nil, variables: nil,
timekey: nil, tag: nil, variables: nil, seq: 0,
id: @c.unique_id,
s: @c.size,
c: @c.created_at.to_i,
Expand All @@ -442,7 +443,7 @@ def gen_chunk_path(prefix, unique_id)
@c.write_metadata

expected = {
timekey: nil, tag: nil, variables: nil,
timekey: nil, tag: nil, variables: nil, seq: 0,
id: @c.unique_id,
s: @c.size,
c: @c.created_at.to_i,
Expand All @@ -453,7 +454,7 @@ def gen_chunk_path(prefix, unique_id)
@c.commit

expected = {
timekey: nil, tag: nil, variables: nil,
timekey: nil, tag: nil, variables: nil, seq: 0,
id: @c.unique_id,
s: @c.size,
c: @c.created_at.to_i,
Expand All @@ -473,7 +474,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal content, File.open(@c.path, 'rb'){|f| f.read }

stored_meta = {
timekey: nil, tag: nil, variables: nil,
timekey: nil, tag: nil, variables: nil, seq: 0,
id: unique_id,
s: size,
c: created_at.to_i,
Expand Down Expand Up @@ -518,7 +519,7 @@ def gen_chunk_path(prefix, unique_id)
end

@metadata = {
timekey: nil, tag: 'testing', variables: {k: "x"},
timekey: nil, tag: 'testing', variables: {k: "x"}, seq: 0,
id: @chunk_id,
s: 4,
c: Time.parse('2016-04-07 17:44:00 +0900').to_i,
Expand Down Expand Up @@ -591,7 +592,7 @@ def gen_chunk_path(prefix, unique_id)
@c.append([d5s])

metadata = {
timekey: nil, tag: 'testing', variables: {k: "x"},
timekey: nil, tag: 'testing', variables: {k: "x"}, seq: 0,
id: @chunk_id,
s: 4,
c: Time.parse('2016-04-07 17:44:00 +0900').to_i,
Expand All @@ -602,7 +603,7 @@ def gen_chunk_path(prefix, unique_id)
@c.write_metadata

metadata = {
timekey: nil, tag: 'testing', variables: {k: "x"},
timekey: nil, tag: 'testing', variables: {k: "x"}, seq: 0,
id: @chunk_id,
s: 5,
c: Time.parse('2016-04-07 17:44:00 +0900').to_i,
Expand Down Expand Up @@ -671,7 +672,7 @@ def gen_chunk_path(prefix, unique_id)
@dummy_timekey = Time.parse('2016-04-07 17:40:00 +0900').to_i

@metadata = {
timekey: @dummy_timekey, tag: 'testing', variables: {k: "x"},
timekey: @dummy_timekey, tag: 'testing', variables: {k: "x"}, seq: 0,
id: @chunk_id,
s: 4,
c: Time.parse('2016-04-07 17:44:00 +0900').to_i,
Expand Down