Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the current list of timekeys as a buffer metric #2343

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize
@dequeued_num = {} # metadata => int (number of dequeued chunks)

@stage_size = @queue_size = 0
@timekeys = Hash.new(0)
@metadata_list = [] # keys of @stage
end

Expand All @@ -176,12 +177,14 @@ def start
@stage.each_pair do |metadata, chunk|
@metadata_list << metadata unless @metadata_list.include?(metadata)
@stage_size += chunk.bytesize
add_timekey(metadata)
end
@queue.each do |chunk|
@metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata)
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
add_timekey(chunk.metadata)
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end
Expand All @@ -206,6 +209,7 @@ def terminate
super
@dequeued = @stage = @queue = @queued_num = @metadata_list = nil
@stage_size = @queue_size = 0
@timekeys.clear
end

def storable?
Expand Down Expand Up @@ -251,6 +255,7 @@ def add_metadata(metadata)
@metadata_list[i]
else
@metadata_list << metadata
add_timekey(metadata)
metadata
end
end
Expand All @@ -261,6 +266,30 @@ def metadata(timekey: nil, tag: nil, variables: nil)
add_metadata(meta)
end

def add_timekey(metadata)
if t = metadata.timekey
@timekeys[t] += 1
end
nil
end
private :add_timekey

def del_timekey(metadata)
if t = metadata.timekey
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
private :del_timekey

def timekeys
@timekeys.keys
end

# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events, or EventStream
# metadata_and_data MUST be a hash of { metadata => data }
Expand Down Expand Up @@ -506,6 +535,7 @@ def purge_chunk(chunk_id)
@metadata_list.delete(metadata)
@queued_num.delete(metadata)
@dequeued_num.delete(metadata)
del_timekey(metadata)
end
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def shutdown
MONITOR_INFO = {
'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
}
Expand Down
18 changes: 15 additions & 3 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def create_chunk_es(metadata, es)

assert_equal 0, plugin.stage_size
assert_equal 0, plugin.queue_size
assert_equal [], plugin.timekeys

# @p is started plugin

Expand Down Expand Up @@ -242,6 +243,7 @@ def create_chunk_es(metadata, es)
assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_equal [], @p.timekeys
end

test '#metadata_list returns list of metadata on stage or in queue' do
Expand Down Expand Up @@ -569,9 +571,12 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

prev_stage_size = @p.stage_size
timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert !@p.timekeys.include?(timekey)

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
prev_stage_size = @p.stage_size

m = @p.metadata(timekey: timekey)

@p.write({m => ["x" * 256, "y" * 256, "z" * 256]})

Expand All @@ -581,6 +586,8 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys

assert @p.timekeys.include?(timekey)
end

test '#write tries to enqueue and store data into a new chunk if existing chunk is full' do
Expand Down Expand Up @@ -688,8 +695,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert !@p.timekeys.include?(timekey)

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
m = @p.metadata(timekey: timekey)

es = Fluent::ArrayEventStream.new(
[
Expand All @@ -708,6 +718,8 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys
assert_equal 1, @p.stage[m].append_count

assert @p.timekeys.include?(timekey)
end

test '#write w/ format tries to enqueue and store data into a new chunk if existing chunk does not have enough space' do
Expand Down
14 changes: 10 additions & 4 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def test_configure
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -296,6 +297,7 @@ def get(uri, header = {})
expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -333,6 +335,7 @@ def get(uri, header = {})
expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -367,6 +370,7 @@ def get(uri, header = {})
}
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -434,7 +438,8 @@ def write(chunk)
<match **>
@type test_out_fail_write
@id test_out_fail_write
<buffer>
<buffer time>
timekey 1m
flush_mode immediate
</buffer>
</match>
Expand All @@ -453,17 +458,18 @@ def write(chunk)
include_config no
")
d.instance.start
output = @ra.outputs[0]
output.start
output.after_start
expected_test_out_fail_write_response = {
"buffer_queue_length" => 1,
"buffer_timekeys" => [output.calculate_timekey(event_time)],
"buffer_total_queued_size" => 40,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
"type" => "test_out_fail_write",
}
output = @ra.outputs[0]
output.start
output.after_start
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
2.times do
Expand Down