From 0470e4e7d38e7b364acfd230b20f4237c8ef05d0 Mon Sep 17 00:00:00 2001 From: Trevor Wood Date: Thu, 2 Jun 2022 15:03:47 -0400 Subject: [PATCH] feat(fluentd): allow fluentd_thread label to be configurable (#6240) Signed-off-by: Trevor Wood --- CHANGELOG.md | 1 + .../cmd/fluentd/lib/fluent/plugin/out_loki.rb | 5 +- .../gems/fluent/plugin/loki_output_spec.rb | 50 ++++++++++++++++++- docs/sources/clients/fluentd/_index.md | 1 + 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdc3757ea60b..6f525b190c65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -312,6 +312,7 @@ to include only the most relevant. * [5223](https://github.com/grafana/loki/pull/5223) **cyriltovena**: fluent-bit: Attempt to unmarshal nested json. #### FluentD +* [6240](https://github.com/grafana/loki/pull/6240) **taharah**: Add the feature flag `include_thread_label` to allow the `fluentd_thread` label included when using multiple threads for flushing to be configurable * [5107](https://github.com/grafana/loki/pull/5107) **chaudum**: fluentd: Fix bug that caused lines to be dropped when containing non utf-8 characters * [5163](https://github.com/grafana/loki/pull/5163) **chaudum**: Fix encoding error in fluentd client diff --git a/clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb b/clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb index 676670c10e1d..7b1e2f2c02c8 100644 --- a/clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb +++ b/clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb @@ -73,6 +73,9 @@ class LogPostError < StandardError; end desc 'if a record only has 1 key, then just set the log line to the value and discard the key.' config_param :drop_single_key, :bool, default: false + desc 'whether or not to include the fluentd_thread label when multiple threads are used for flushing' + config_param :include_thread_label, :bool, default: true + config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE config_set_default :chunk_keys, [] @@ -332,7 +335,7 @@ def line_to_loki(record) # unique per flush thread # note that flush thread != fluentd worker. if you use multiple workers you still need to # add the worker id as a label - if @buffer_config.flush_thread_count > 1 + if @include_thread_label && @buffer_config.flush_thread_count > 1 chunk_labels['fluentd_thread'] = Thread.current[:_fluentd_plugin_helper_thread_title].to_s end diff --git a/clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb b/clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb index 162730e8b3bc..008cfa1c7354 100644 --- a/clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb +++ b/clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb @@ -123,7 +123,7 @@ expect(payload[0]['stream'].empty?).to eq true expect(payload[0]['values'].count).to eq 1 expect(payload[0]['values'][0][0]).to eq "1546270458000000000" - expect(payload[0]['values'][0][1]).to eq "message=\"� rest of line\" number=1.2345 stream=stdout" + expect(payload[0]['values'][0][1]).to eq "message=\"? rest of line\" number=1.2345 stream=stdout" end it 'handle non utf-8 characters from log lines in json format' do @@ -312,4 +312,52 @@ allow(server_error).to receive(:body).and_return('fake body') expect { driver.instance.write(lines) }.to raise_error(described_class::LogPostError) end + + context 'when output is multi-thread' do + let(:thread) do + class_double( + 'Thread', + current: { _fluentd_plugin_helper_thread_title: 'thread1' } + ).as_stubbed_const + end + + before do + allow(Thread).to receive(:new).and_yield(thread) + end + + it 'adds the fluentd_label by default' do + config = <<-CONF + url https://logs-us-west1.grafana.net + + + @type memory + flush_thread_count 2 + + CONF + driver = Fluent::Test::Driver::Output.new(described_class) + driver.configure(config) + content = File.readlines('spec/gems/fluent/plugin/data/syslog2') + chunk = [Time.at(1_546_270_458), content[0]] + payload = driver.instance.generic_to_loki([chunk]) + expect(payload[0]['stream']).to eq('fluentd_thread' => 'thread1') + end + + it 'does not add the fluentd_label when configured' do + config = <<-CONF + url https://logs-us-west1.grafana.net + include_thread_label false + + + @type memory + flush_thread_count 2 + + CONF + driver = Fluent::Test::Driver::Output.new(described_class) + driver.configure(config) + content = File.readlines('spec/gems/fluent/plugin/data/syslog2') + chunk = [Time.at(1_546_270_458), content[0]] + payload = driver.instance.generic_to_loki([chunk]) + expect(payload[0]['stream'].empty?).to eq(true) + end + end end diff --git a/docs/sources/clients/fluentd/_index.md b/docs/sources/clients/fluentd/_index.md index f9691e390e5a..76a252d1633d 100644 --- a/docs/sources/clients/fluentd/_index.md +++ b/docs/sources/clients/fluentd/_index.md @@ -257,6 +257,7 @@ There are few configurations settings to control the output format. - remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line. You can use [record_accessor syntax](https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax). - line_format (default:key_value): format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format `=`. - drop_single_key: if set to true and a record only has 1 key after extracting `` blocks, set the log line to the value and discard the key. +- include_thread_label (default: true): whether or not to include the fluentd_thread label when multiple threads are used for flushing. ### Buffer options