Skip to content

Commit

Permalink
Expose the current list of timekeys as a buffer metric
Browse files Browse the repository at this point in the history
This is useful for monitoring things like:

 - How old is the current oldest log message still in the buffer?

 - Equivalently, what is the maximum latency of the logging system
   as a whole? This might be interesting to track over time.

 - How new is the *newest* log message in the buffer? This might
   indicate a delay upstream of fluentd.

Signed-off-by: Steven McDonald <steven.mcdonald@usabilla.com>
  • Loading branch information
stevenjm committed Mar 22, 2019
1 parent f653461 commit 8d37c85
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 7 deletions.
30 changes: 30 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize
@dequeued_num = {} # metadata => int (number of dequeued chunks)

@stage_size = @queue_size = 0
@timekeys = Hash.new(0)
@metadata_list = [] # keys of @stage
end

Expand All @@ -176,12 +177,14 @@ def start
@stage.each_pair do |metadata, chunk|
@metadata_list << metadata unless @metadata_list.include?(metadata)
@stage_size += chunk.bytesize
add_timekey(metadata)
end
@queue.each do |chunk|
@metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata)
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
add_timekey(chunk.metadata)
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end
Expand All @@ -206,6 +209,7 @@ def terminate
super
@dequeued = @stage = @queue = @queued_num = @metadata_list = nil
@stage_size = @queue_size = 0
@timekeys.clear
end

def storable?
Expand Down Expand Up @@ -251,6 +255,7 @@ def add_metadata(metadata)
@metadata_list[i]
else
@metadata_list << metadata
add_timekey(metadata)
metadata
end
end
Expand All @@ -261,6 +266,30 @@ def metadata(timekey: nil, tag: nil, variables: nil)
add_metadata(meta)
end

def add_timekey(metadata)
if t = metadata.timekey
@timekeys[t] += 1
end
nil
end
private :add_timekey

def del_timekey(metadata)
if t = metadata.timekey
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
private :del_timekey

def timekeys
@timekeys.keys
end

# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events, or EventStream
# metadata_and_data MUST be a hash of { metadata => data }
Expand Down Expand Up @@ -506,6 +535,7 @@ def purge_chunk(chunk_id)
@metadata_list.delete(metadata)
@queued_num.delete(metadata)
@dequeued_num.delete(metadata)
del_timekey(metadata)
end
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def shutdown
MONITOR_INFO = {
'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
}
Expand Down
18 changes: 15 additions & 3 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def create_chunk_es(metadata, es)

assert_equal 0, plugin.stage_size
assert_equal 0, plugin.queue_size
assert_equal [], plugin.timekeys

# @p is started plugin

Expand Down Expand Up @@ -242,6 +243,7 @@ def create_chunk_es(metadata, es)
assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_equal [], @p.timekeys
end

test '#metadata_list returns list of metadata on stage or in queue' do
Expand Down Expand Up @@ -569,9 +571,12 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

prev_stage_size = @p.stage_size
timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert !@p.timekeys.include?(timekey)

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
prev_stage_size = @p.stage_size

m = @p.metadata(timekey: timekey)

@p.write({m => ["x" * 256, "y" * 256, "z" * 256]})

Expand All @@ -581,6 +586,8 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys

assert @p.timekeys.include?(timekey)
end

test '#write tries to enqueue and store data into a new chunk if existing chunk is full' do
Expand Down Expand Up @@ -688,8 +695,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert !@p.timekeys.include?(timekey)

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
m = @p.metadata(timekey: timekey)

es = Fluent::ArrayEventStream.new(
[
Expand All @@ -708,6 +718,8 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys
assert_equal 1, @p.stage[m].append_count

assert @p.timekeys.include?(timekey)
end

test '#write w/ format tries to enqueue and store data into a new chunk if existing chunk does not have enough space' do
Expand Down
14 changes: 10 additions & 4 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def test_configure
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -296,6 +297,7 @@ def get(uri, header = {})
expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -333,6 +335,7 @@ def get(uri, header = {})
expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -367,6 +370,7 @@ def get(uri, header = {})
}
expected_null_response = {
"buffer_queue_length" => 0,
"buffer_timekeys" => [],
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
Expand Down Expand Up @@ -434,7 +438,8 @@ def write(chunk)
<match **>
@type test_out_fail_write
@id test_out_fail_write
<buffer>
<buffer time>
timekey 1m
flush_mode immediate
</buffer>
</match>
Expand All @@ -453,17 +458,18 @@ def write(chunk)
include_config no
")
d.instance.start
output = @ra.outputs[0]
output.start
output.after_start
expected_test_out_fail_write_response = {
"buffer_queue_length" => 1,
"buffer_timekeys" => [output.calculate_timekey(event_time)],
"buffer_total_queued_size" => 40,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
"type" => "test_out_fail_write",
}
output = @ra.outputs[0]
output.start
output.after_start
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
2.times do
Expand Down

0 comments on commit 8d37c85

Please sign in to comment.