Skip to content

Commit

Permalink
[exporter/kafkaexporter] Add encoding extensions support (#34384)
Browse files Browse the repository at this point in the history
**Description:** Add support for encoding extensions in the
kafkaexporter

To be able to use encoding extensions this PR adds extension support and
proposes to rename the existing `encoding` configuration property to
`format` and reusing the `encoding` property for configuring encoding
extensions. Reason is to be consistent with other receivers/exporters.

Related to
#33888
which adds encoding extension support in the `kafkareceiver`.

**Link to tracking Issue:** n/a

**Testing:** Tested via the following configuration.
```
receivers:
  kafka:
    brokers:
    - localhost:29092
    encoding: json
    group_id: test1
    topic: logs_in

extensions:
  json_log_encoding:

exporters:
  debug:
    verbosity: detailed
  kafka:
    brokers:
      - localhost:29092
    encoding: json_log_encoding
    topic: json_out

processors:
  batch:

service:
  extensions: [json_log_encoding]
  pipelines:
    logs:
      receivers: [kafka]
      processors: [batch]
      exporters: [debug, kafka]
  telemetry:
    logs:
      level: "info"
```

Any json can be written to the `logs_in` topic and results be viewed in
the `json_out` topic.

When removing `encoding: json_log_encoding` the default format type is
used and the output in `json_out` topic changes accordingly.

**Documentation:** Updated README.md within the receiver describing the
use of encoding extensions.

Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com>
  • Loading branch information
thmshmm and MovieStoreGuy authored Sep 12, 2024
1 parent c2a2213 commit 21208c0
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 80 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafkaexporter-encoding-extensions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for encoding extensions in the Kafka exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34384]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change adds support for encoding extensions in the Kafka exporter. Loading extensions takes precedence over the internally supported encodings.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 3 additions & 12 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(oCfg, set)
if err != nil {
return nil, err
}
exp := newTracesExporter(oCfg, set)
return exporterhelper.NewTracesExporter(
ctx,
set,
Expand Down Expand Up @@ -136,10 +133,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(oCfg, set)
if err != nil {
return nil, err
}
exp := newMetricsExporter(oCfg, set)
return exporterhelper.NewMetricsExporter(
ctx,
set,
Expand Down Expand Up @@ -167,10 +161,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(oCfg, set)
if err != nil {
return nil, err
}
exp := newLogsExporter(oCfg, set)
return exporterhelper.NewLogsExporter(
ctx,
set,
Expand Down
129 changes: 90 additions & 39 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,23 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[ptrace.Marshaler](
host,
e.cfg.Encoding,
); errExt == nil {
e.marshaler = &tracesEncodingMarshaler{
marshaler: *marshaler,
encoding: e.cfg.Encoding,
}
}
if marshaler, errInt := createTracesMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
e.marshaler = marshaler
}
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -107,7 +123,23 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[pmetric.Marshaler](
host,
e.cfg.Encoding,
); errExt == nil {
e.marshaler = &metricsEncodingMarshaler{
marshaler: *marshaler,
encoding: e.cfg.Encoding,
}
}
if marshaler, errInt := createMetricMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
e.marshaler = marshaler
}
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -149,7 +181,23 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[plog.Marshaler](
host,
e.cfg.Encoding,
); errExt == nil {
e.marshaler = &logsEncodingMarshaler{
marshaler: *marshaler,
encoding: e.cfg.Encoding,
}
}
if marshaler, errInt := createLogMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
e.marshaler = marshaler
}
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -204,50 +252,26 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
return producer, nil
}

func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProducer, error) {
marshaler, err := createMetricMarshaler(config)
if err != nil {
return nil, err
}

if marshaler == nil {
return nil, errUnrecognizedEncoding
}

func newMetricsExporter(config Config, set exporter.Settings) *kafkaMetricsProducer {
return &kafkaMetricsProducer{
cfg: config,
marshaler: marshaler,
logger: set.Logger,
}, nil

cfg: config,
logger: set.Logger,
}
}

// newTracesExporter creates Kafka exporter.
func newTracesExporter(config Config, set exporter.Settings) (*kafkaTracesProducer, error) {
marshaler, err := createTracesMarshaler(config)
if err != nil {
return nil, err
}

func newTracesExporter(config Config, set exporter.Settings) *kafkaTracesProducer {
return &kafkaTracesProducer{
cfg: config,
marshaler: marshaler,
logger: set.Logger,
}, nil
}

func newLogsExporter(config Config, set exporter.Settings) (*kafkaLogsProducer, error) {
marshaler, err := createLogMarshaler(config)
if err != nil {
return nil, err
cfg: config,
logger: set.Logger,
}
}

func newLogsExporter(config Config, set exporter.Settings) *kafkaLogsProducer {
return &kafkaLogsProducer{
cfg: config,
marshaler: marshaler,
logger: set.Logger,
}, nil

cfg: config,
logger: set.Logger,
}
}

type resourceSlice[T any] interface {
Expand All @@ -271,3 +295,30 @@ func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
}
return cfg.Topic
}

// loadEncodingExtension tries to load an available extension for the given encoding.
func loadEncodingExtension[T any](host component.Host, encoding string) (*T, error) {
extensionID, err := encodingToComponentID(encoding)
if err != nil {
return nil, err
}
encodingExtension, ok := host.GetExtensions()[*extensionID]
if !ok {
return nil, fmt.Errorf("unknown encoding extension %q", encoding)
}
unmarshaler, ok := encodingExtension.(T)
if !ok {
return nil, fmt.Errorf("extension %q is not an unmarshaler", encoding)
}
return &unmarshaler, nil
}

// encodingToComponentID converts an encoding string to a component ID using the given encoding as type.
func encodingToComponentID(encoding string) (*component.ID, error) {
componentType, err := component.NewType(encoding)
if err != nil {
return nil, fmt.Errorf("invalid component type: %w", err)
}
id := component.NewID(componentType)
return &id, nil
}
Loading

0 comments on commit 21208c0

Please sign in to comment.