From 20cabf756d2da634db38007e4c8ce130461d76fe Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 28 Dec 2023 20:42:52 -0600 Subject: [PATCH] [exporter/kafka] Add ability to specifiy Kafka Client ID (#30145) **Description:** This P.R adds the ability to specify the Kafka client ID in the Sarama Kafka producer when constructing the client. **Link to tracking Issue:** Resolves 30144 **Testing:** I didn't add any tests because its just propagating a configuration value, and the Sarama client (including its mock) does not expose any mechanism for asserting on which client ID was propagated with each produce request because it happens at the TCP protocol level and is not part of the application level payload. https://github.com/IBM/sarama/blob/main/mocks/mocks.go#L40 **Documentation:** I've updated the documentation and change log to document this feature. --- .chloggen/ra_add-support-client-id.yaml | 27 +++++++++++++++++++++ exporter/kafkaexporter/README.md | 1 + exporter/kafkaexporter/config.go | 4 +++ exporter/kafkaexporter/config_test.go | 3 +++ exporter/kafkaexporter/factory.go | 2 ++ exporter/kafkaexporter/factory_test.go | 1 + exporter/kafkaexporter/kafka_exporter.go | 3 +++ exporter/kafkaexporter/testdata/config.yaml | 1 + 8 files changed, 42 insertions(+) create mode 100755 .chloggen/ra_add-support-client-id.yaml diff --git a/.chloggen/ra_add-support-client-id.yaml b/.chloggen/ra_add-support-client-id.yaml new file mode 100755 index 000000000000..893e6c8c154f --- /dev/null +++ b/.chloggen/ra_add-support-client-id.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "kafkaexporter" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds the ability to configure the Kafka client's Client ID." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30144] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index bd138b0fde56..833309433253 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -26,6 +26,7 @@ The following settings are required: The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers. - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup. +- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests. - `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to. - `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings: - `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index e893d71385b7..1469dded0971 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -32,6 +32,10 @@ type Config struct { // Kafka protocol version ProtocolVersion string `mapstructure:"protocol_version"` + // ClientID to configure the Kafka client with. This can be leveraged by + // Kafka to enforce ACLs, throttling quotas, and more. + ClientID string `mapstructure:"client_id"` + // The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics) Topic string `mapstructure:"topic"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 4b41c23d9792..70d7a68c3e89 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -58,6 +58,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -111,6 +112,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -163,6 +165,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, Brokers: []string{"foo:123", "bar:456"}, + ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 37f0bc074be8..06fd1feb60ca 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -22,6 +22,7 @@ const ( defaultLogsTopic = "otlp_logs" defaultEncoding = "otlp_proto" defaultBroker = "localhost:9092" + defaultClientID = "sarama" // default from sarama.NewConfig() defaultMetadataRetryMax = 3 // default from sarama.NewConfig() @@ -93,6 +94,7 @@ func createDefaultConfig() component.Config { RetrySettings: exporterhelper.NewDefaultRetrySettings(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), Brokers: []string{defaultBroker}, + ClientID: defaultClientID, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. Topic: "", Encoding: defaultEncoding, diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 74f11af33836..42692c5ab6d0 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -58,6 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NoError(t, componenttest.CheckConfigStruct(cfg)) assert.Equal(t, []string{defaultBroker}, cfg.Brokers) assert.Equal(t, "", cfg.Topic) + assert.Equal(t, "sarama", cfg.ClientID) } func TestCreateMetricExporter(t *testing.T) { diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 202e4ca8f090..47417f5ee4c9 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -122,6 +122,9 @@ func (e *kafkaLogsProducer) Close(context.Context) error { func newSaramaProducer(config Config) (sarama.SyncProducer, error) { c := sarama.NewConfig() + + c.ClientID = config.ClientID + // These setting are required by the sarama.SyncProducer implementation. c.Producer.Return.Successes = true c.Producer.Return.Errors = true diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 38b38bd5ac13..15624b521b10 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -3,6 +3,7 @@ kafka: brokers: - "foo:123" - "bar:456" + client_id: "test_client_id" metadata: full: false retry: