Skip to content

Commit

Permalink
feat(fluentd): allow fluentd_thread label to be configurable (#6240)
Browse files Browse the repository at this point in the history
Signed-off-by: Trevor Wood <Trevor.G.Wood@gmail.com>
  • Loading branch information
taharah authored Jun 2, 2022
1 parent 1c5e094 commit 0470e4e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ to include only the most relevant.
* [5223](https://github.com/grafana/loki/pull/5223) **cyriltovena**: fluent-bit: Attempt to unmarshal nested json.

#### FluentD
* [6240](https://github.com/grafana/loki/pull/6240) **taharah**: Add the feature flag `include_thread_label` to allow the `fluentd_thread` label included when using multiple threads for flushing to be configurable
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum**: fluentd: Fix bug that caused lines to be dropped when containing non utf-8 characters
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum**: Fix encoding error in fluentd client

Expand Down
5 changes: 4 additions & 1 deletion clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class LogPostError < StandardError; end
desc 'if a record only has 1 key, then just set the log line to the value and discard the key.'
config_param :drop_single_key, :bool, default: false

desc 'whether or not to include the fluentd_thread label when multiple threads are used for flushing'
config_param :include_thread_label, :bool, default: true

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
config_set_default :chunk_keys, []
Expand Down Expand Up @@ -332,7 +335,7 @@ def line_to_loki(record)
# unique per flush thread
# note that flush thread != fluentd worker. if you use multiple workers you still need to
# add the worker id as a label
if @buffer_config.flush_thread_count > 1
if @include_thread_label && @buffer_config.flush_thread_count > 1
chunk_labels['fluentd_thread'] = Thread.current[:_fluentd_plugin_helper_thread_title].to_s
end

Expand Down
50 changes: 49 additions & 1 deletion clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq "1546270458000000000"
expect(payload[0]['values'][0][1]).to eq "message=\" rest of line\" number=1.2345 stream=stdout"
expect(payload[0]['values'][0][1]).to eq "message=\"? rest of line\" number=1.2345 stream=stdout"
end

it 'handle non utf-8 characters from log lines in json format' do
Expand Down Expand Up @@ -312,4 +312,52 @@
allow(server_error).to receive(:body).and_return('fake body')
expect { driver.instance.write(lines) }.to raise_error(described_class::LogPostError)
end

context 'when output is multi-thread' do
let(:thread) do
class_double(
'Thread',
current: { _fluentd_plugin_helper_thread_title: 'thread1' }
).as_stubbed_const
end

before do
allow(Thread).to receive(:new).and_yield(thread)
end

it 'adds the fluentd_label by default' do
config = <<-CONF
url https://logs-us-west1.grafana.net
<buffer>
@type memory
flush_thread_count 2
</buffer>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
chunk = [Time.at(1_546_270_458), content[0]]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream']).to eq('fluentd_thread' => 'thread1')
end

it 'does not add the fluentd_label when configured' do
config = <<-CONF
url https://logs-us-west1.grafana.net
include_thread_label false
<buffer>
@type memory
flush_thread_count 2
</buffer>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
chunk = [Time.at(1_546_270_458), content[0]]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq(true)
end
end
end
1 change: 1 addition & 0 deletions docs/sources/clients/fluentd/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ There are few configurations settings to control the output format.
- remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line. You can use [record_accessor syntax](https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax).
- line_format (default:key_value): format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format `<key>=<value>`.
- drop_single_key: if set to true and a record only has 1 key after extracting `<label></label>` blocks, set the log line to the value and discard the key.
- include_thread_label (default: true): whether or not to include the fluentd_thread label when multiple threads are used for flushing.

### Buffer options

Expand Down

0 comments on commit 0470e4e

Please sign in to comment.