Skip to content

Commit

Permalink
[exporter/kafka]Decide the Kafka topic based on the value of the attr…
Browse files Browse the repository at this point in the history
…ibute. (#31809)

I've implemented the feature based on the discussion in the referenced
issue. I'm assuming it will suffice in most cases if the text attributes
are available, so I haven't planned for arrays or maps.

Fixes #31178

I've implemented unit tests. For the tests related to kafka_exporter, I
followed the existing implementation, but I find it somewhat redundant.
If it's okay, I'd like to switch to table-driven tests.

---------

Co-authored-by: Curtis Robert <crobert@splunk.com>
Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
3 people authored May 3, 2024
1 parent e2388c2 commit 45ef858
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/attribute-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable setting message topics using resource attributes.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31178]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following settings can be optionally configured:
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

Expand Down
35 changes: 26 additions & 9 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -26,7 +27,6 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler TracesMarshaler
logger *zap.Logger
}
Expand All @@ -41,7 +41,7 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, e.topic)
messages, err := e.marshaler.Marshal(td, getTopic(&e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -78,13 +78,12 @@ func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
type kafkaMetricsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler MetricsMarshaler
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, e.topic)
messages, err := e.marshaler.Marshal(md, getTopic(&e.cfg, md.ResourceMetrics()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -121,13 +120,12 @@ func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error
type kafkaLogsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler LogsMarshaler
logger *zap.Logger
}

func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, e.topic)
messages, err := e.marshaler.Marshal(ld, getTopic(&e.cfg, ld.ResourceLogs()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -219,7 +217,6 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m

return &kafkaMetricsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -240,7 +237,6 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma

return &kafkaTracesProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -254,9 +250,30 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[

return &kafkaLogsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil

}

type resourceSlice[T any] interface {
Len() int
At(int) T
}

type resource interface {
Resource() pcommon.Resource
}

func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute == "" {
return cfg.Topic
}
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
}
}
return cfg.Topic
}
126 changes: 126 additions & 0 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ func TestTracesPusher(t *testing.T) {
require.NoError(t, err)
}

func TestTracesPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaTracesProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.tracesPusher(context.Background(), testdata.GenerateTraces(2))
require.NoError(t, err)
}

func TestTracesPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -196,6 +215,25 @@ func TestMetricsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestMetricsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaMetricsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.metricsDataPusher(context.Background(), testdata.GenerateMetrics(2))
require.NoError(t, err)
}

func TestMetricsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -243,6 +281,25 @@ func TestLogsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestLogsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaLogsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.logsDataPusher(context.Background(), testdata.GenerateLogs(1))
require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -311,3 +368,72 @@ func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMe
func (e logsErrorMarshaler) Encoding() string {
panic("implement me")
}

func Test_GetTopic(t *testing.T) {
tests := []struct {
name string
cfg Config
resource any
wantTopic string
}{
{
name: "Valid metric attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid trace attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid log attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "resource-attr-val-1",
},
{
name: "Attribute not found",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
{
name: "TopicFromAttribute not set, return default topic",
cfg: Config{
Topic: "defaultTopic",
},
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
}

for i := range tests {
t.Run(tests[i].name, func(t *testing.T) {
topic := ""
switch r := tests[i].resource.(type) {
case pmetric.ResourceMetricsSlice:
topic = getTopic(&tests[i].cfg, r)
case ptrace.ResourceSpansSlice:
topic = getTopic(&tests[i].cfg, r)
case plog.ResourceLogsSlice:
topic = getTopic(&tests[i].cfg, r)
}
assert.Equal(t, tests[i].wantTopic, topic)
})
}
}

0 comments on commit 45ef858

Please sign in to comment.