Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka]Decide the Kafka topic based on the value of the attribute. #31809

Merged
merged 26 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
88f8227
Decide the Kafka topic based on the value of the attribute.
pyama86 Mar 18, 2024
1148f19
Stop making unnecessary assertions
pyama86 Mar 18, 2024
881dfea
In most cases, it is better not to have missing logs.
pyama86 Mar 18, 2024
441556d
add change log
pyama86 Mar 19, 2024
28c28ac
Merge branch 'main' into attribute-topic
pyama86 Mar 19, 2024
9b92821
I want to become proficient in English.
pyama86 Mar 31, 2024
f27ac4e
Check for the presence of spans, and if none exist, set a default topic.
pyama86 Mar 31, 2024
1a28d5b
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 Mar 31, 2024
6ba26f4
Use the first valid attribute that can be obtained.
pyama86 Mar 31, 2024
1da7629
add readme for topic_from_attribute
pyama86 Mar 31, 2024
0079c09
Using interfaces allows for DRY (Don't Repeat Yourself) implementation.
pyama86 Apr 7, 2024
7fd0865
What's related to trace is span.
pyama86 Apr 7, 2024
be1efb8
Write an appropriate description.
pyama86 Apr 7, 2024
dfb1a85
There's no need for assignment.
pyama86 Apr 8, 2024
50568fe
Update exporter/kafkaexporter/config.go
pyama86 Apr 9, 2024
70e58e6
Merge branch 'main' into attribute-topic
pyama86 Apr 24, 2024
44272fa
Needs a clearer explanation
pyama86 Apr 26, 2024
3991e9d
Update exporter/kafkaexporter/README.md
pyama86 Apr 28, 2024
141544e
Merge branch 'main' into attribute-topic
codeboten May 1, 2024
99ea82e
Merge branch 'main' into attribute-topic
pyama86 May 2, 2024
5e9785a
fix test interface
pyama86 May 2, 2024
09c53e8
Merge branch 'main' into attribute-topic
pyama86 May 2, 2024
687522a
Merge branch 'main' into attribute-topic
codeboten May 2, 2024
5942c3c
fix lint
pyama86 May 3, 2024
c959fbe
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 May 3, 2024
27d13a3
Merge branch 'attribute-topic' of github.com:pyama86/opentelemetry-co…
pyama86 May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/attribute-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: We have made it possible to determine the topic by using attributes.
pyama86 marked this conversation as resolved.
Show resolved Hide resolved

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31178]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 18 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ type Config struct {
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
pyama86 marked this conversation as resolved.
Show resolved Hide resolved

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

Expand Down Expand Up @@ -123,6 +127,20 @@ func (cfg *Config) Validate() error {
return validateSASLConfig(cfg.Authentication.SASL)
}

func (cfg *Config) getTopic(attrs pcommon.Map) string {
if cfg.TopicFromAttribute != "" {
rv, ok := attrs.Get(cfg.TopicFromAttribute)
if ok {
topic := rv.Str()
if topic != "" {
return topic
}
}
}

return cfg.Topic
}

func validateSASLConfig(c *kafka.SASLConfig) error {
pyama86 marked this conversation as resolved.
Show resolved Hide resolved
if c == nil {
return nil
Expand Down
63 changes: 63 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

Expand Down Expand Up @@ -334,3 +336,64 @@ func Test_saramaProducerCompressionCodec(t *testing.T) {
})
}
}

func TestConfig_GetTopic(t *testing.T) {
tests := []struct {
name string
cfg Config
resource pcommon.Map
wantTopic string
}{
{
name: "Valid metric attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetricsOneMetric().ResourceMetrics().At(0).Resource().Attributes(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid trace attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateTracesOneSpan().ResourceSpans().At(0).Resource().Attributes(),
wantTopic: "resource-attr-val-1",
},
{
name: "Valid log attribute, return topic name",
cfg: Config{
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
resource: testdata.GenerateLogsOneLogRecord().ResourceLogs().At(0).Resource().Attributes(),
wantTopic: "resource-attr-val-1",
},
{
name: "Attribute not found",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
resource: testdata.GenerateMetricsOneMetricNoAttributes().ResourceMetrics().At(0).Resource().Attributes(),
wantTopic: "defaultTopic",
},
{
name: "TopicFromAttribute not set, return default topic",
cfg: Config{
Topic: "defaultTopic",
},
resource: testdata.GenerateMetricsOneMetric().ResourceMetrics().At(0).Resource().Attributes(),
wantTopic: "defaultTopic",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topic := tt.cfg.getTopic(tt.resource)
assert.Equal(t, tt.wantTopic, topic)
})
}
}
18 changes: 9 additions & 9 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler TracesMarshaler
logger *zap.Logger
}
Expand All @@ -41,7 +40,9 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, e.topic)
topic := e.cfg.getTopic(td.ResourceSpans().At(0).Resource().Attributes())
pyama86 marked this conversation as resolved.
Show resolved Hide resolved
pyama86 marked this conversation as resolved.
Show resolved Hide resolved

messages, err := e.marshaler.Marshal(td, topic)
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -78,13 +79,14 @@ func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
type kafkaMetricsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler MetricsMarshaler
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, e.topic)
topic := e.cfg.getTopic(md.ResourceMetrics().At(0).Resource().Attributes())

messages, err := e.marshaler.Marshal(md, topic)
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -121,13 +123,14 @@ func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error
type kafkaLogsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler LogsMarshaler
logger *zap.Logger
}

func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, e.topic)
topic := e.cfg.getTopic(ld.ResourceLogs().At(0).Resource().Attributes())

messages, err := e.marshaler.Marshal(ld, topic)
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -213,7 +216,6 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
}
return &kafkaMetricsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -234,7 +236,6 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma

return &kafkaTracesProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand All @@ -248,7 +249,6 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[

return &kafkaLogsProducer{
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
}, nil
Expand Down
57 changes: 57 additions & 0 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ func TestTracesPusher(t *testing.T) {
require.NoError(t, err)
}

func TestTracesPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaTracesProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.tracesPusher(context.Background(), testdata.GenerateTracesTwoSpansSameResource())
require.NoError(t, err)
}

func TestTracesPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -196,6 +215,25 @@ func TestMetricsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestMetricsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaMetricsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.metricsDataPusher(context.Background(), testdata.GenerateMetricsTwoMetrics())
require.NoError(t, err)
}

func TestMetricsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -243,6 +281,25 @@ func TestLogsDataPusher(t *testing.T) {
require.NoError(t, err)
}

func TestLogsDataPusher_attr(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaLogsProducer{
cfg: Config{
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.logsDataPusher(context.Background(), testdata.GenerateLogsOneLogRecord())
require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down