Skip to content

Commit

Permalink
Extend kafkareceiver configuration capabilities (open-telemetry#5677)
Browse files Browse the repository at this point in the history
* [kafkareceiver] Configure autocommit

Make the autocommit functionality configurable. Giving the option to
disable it or change the interval.

* [kafkareceiver] Configure message marking

Make the time and condition of the message marking configurable.
The messages could be marked before or after they are processed by
the pipeline and could depend on the processing result.

* [kafkareceiver] Update README

* Fix spelling in receiver/kafkareceiver/config.go

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>

* Update receiver/kafkareceiver/config.go

Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com>

* Update receiver/kafkareceiver/README.md

Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com>

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>
Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 15, 2021
1 parent 50482cd commit 91b03e6
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 23 deletions.
7 changes: 7 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ The following settings can be optionally configured:
- `retry`
- `max` (default = 3): The number of retries to get metadata
- `backoff` (default = 250ms): How long to wait between metadata retries
- `autocommit`
- `enable`: (default = true) Whether or not to auto-commit updated offsets back to the broker
- `interval`: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
- `message_marking`:
- `after`: (default = false) If true, the messages are marked after the pipeline execution
- `on_error`: (default = false) If false, only the successfully processed messages are marked
**Note: this can block the entire partition in case a message processing returns a permanent error**

Example:

Expand Down
28 changes: 28 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,33 @@
package kafkareceiver

import (
"time"

"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
)

type AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool `mapstructure:"enable"`
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration `mapstructure:"interval"`
}

type MessageMarking struct {
// If true, the messages are marked after the pipeline execution
After bool `mapstructure:"after"`

// If false, only the successfully processed messages are marked, it has no impact if
// After is set to false.
// Note: this can block the entire partition in case a message processing returns
// a permanent error.
OnError bool `mapstructure:"on_error"`
}

// Config defines configuration for Kafka receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Expand All @@ -41,6 +63,12 @@ type Config struct {
Metadata kafkaexporter.Metadata `mapstructure:"metadata"`

Authentication kafkaexporter.Authentication `mapstructure:"auth"`

// Controls the auto-commit functionality
AutoCommit AutoCommit `mapstructure:"autocommit"`

// Controls the way the messages are marked as consumed
MessageMarking MessageMarking `mapstructure:"message_marking"`
}

var _ config.Receiver = (*Config)(nil)
Expand Down
4 changes: 4 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,9 @@ func TestLoadConfig(t *testing.T) {
Backoff: time.Second * 5,
},
},
AutoCommit: AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
}, r)
}
13 changes: 13 additions & 0 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const (
defaultMetadataRetryBackoff = time.Millisecond * 250
// default from sarama.NewConfig()
defaultMetadataFull = true

// default from sarama.NewConfig()
defaultAutoCommitEnable = true
// default from sarama.NewConfig()
defaultAutoCommitInterval = 1 * time.Second
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -107,6 +112,14 @@ func createDefaultConfig() config.Receiver {
Backoff: defaultMetadataRetryBackoff,
},
},
AutoCommit: AutoCommit{
Enable: defaultAutoCommitEnable,
Interval: defaultAutoCommitInterval,
},
MessageMarking: MessageMarking{
After: false,
OnError: false,
},
}
}

Expand Down
127 changes: 106 additions & 21 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type kafkaTracesConsumer struct {
unmarshaler TracesUnmarshaler

settings component.ReceiverCreateSettings

autocommitEnabled bool
messageMarking MessageMarking
}

// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka.
Expand All @@ -59,6 +62,9 @@ type kafkaMetricsConsumer struct {
unmarshaler MetricsUnmarshaler

settings component.ReceiverCreateSettings

autocommitEnabled bool
messageMarking MessageMarking
}

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
Expand All @@ -71,6 +77,9 @@ type kafkaLogsConsumer struct {
unmarshaler LogsUnmarshaler

settings component.ReceiverCreateSettings

autocommitEnabled bool
messageMarking MessageMarking
}

var _ component.Receiver = (*kafkaTracesConsumer)(nil)
Expand Down Expand Up @@ -103,12 +112,14 @@ func newTracesReceiver(config Config, set component.ReceiverCreateSettings, unma
return nil, err
}
return &kafkaTracesConsumer{
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
}, nil
}

Expand All @@ -126,6 +137,8 @@ func (c *kafkaTracesConsumer) Start(context.Context, component.Host) error {
Transport: transport,
ReceiverCreateSettings: c.settings,
}),
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
}
go c.consumeLoop(ctx, consumerGroup) // nolint:errcheck
<-consumerGroup.ready
Expand Down Expand Up @@ -164,6 +177,9 @@ func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unm
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval

if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand All @@ -179,12 +195,14 @@ func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unm
return nil, err
}
return &kafkaMetricsConsumer{
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
}, nil
}

Expand All @@ -202,6 +220,8 @@ func (c *kafkaMetricsConsumer) Start(context.Context, component.Host) error {
Transport: transport,
ReceiverCreateSettings: c.settings,
}),
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
}
go c.consumeLoop(ctx, metricsConsumerGroup)
<-metricsConsumerGroup.ready
Expand Down Expand Up @@ -254,12 +274,14 @@ func newLogsReceiver(config Config, set component.ReceiverCreateSettings, unmars
return nil, err
}
return &kafkaLogsConsumer{
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
}, nil
}

Expand All @@ -277,6 +299,8 @@ func (c *kafkaLogsConsumer) Start(context.Context, component.Host) error {
Transport: transport,
ReceiverCreateSettings: c.settings,
}),
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
}
go c.consumeLoop(ctx, logsConsumerGroup)
<-logsConsumerGroup.ready
Expand Down Expand Up @@ -314,6 +338,9 @@ type tracesConsumerGroupHandler struct {
logger *zap.Logger

obsrecv *obsreport.Receiver

autocommitEnabled bool
messageMarking MessageMarking
}

type metricsConsumerGroupHandler struct {
Expand All @@ -326,6 +353,9 @@ type metricsConsumerGroupHandler struct {
logger *zap.Logger

obsrecv *obsreport.Receiver

autocommitEnabled bool
messageMarking MessageMarking
}

type logsConsumerGroupHandler struct {
Expand All @@ -338,6 +368,9 @@ type logsConsumerGroupHandler struct {
logger *zap.Logger

obsrecv *obsreport.Receiver

autocommitEnabled bool
messageMarking MessageMarking
}

var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil)
Expand All @@ -361,12 +394,17 @@ func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession

func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
if !c.autocommitEnabled {
defer session.Commit()
}
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")
if !c.messageMarking.After {
session.MarkMessage(message, "")
}

ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
Expand All @@ -378,15 +416,27 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
traces, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
if !c.autocommitEnabled {
session.Commit()
}
}
return nil
}
Expand All @@ -408,12 +458,18 @@ func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSessio

func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
if !c.autocommitEnabled {
defer session.Commit()
}

for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")
if !c.messageMarking.After {
session.MarkMessage(message, "")
}

ctx := c.obsrecv.StartMetricsOp(session.Context())
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
Expand All @@ -425,15 +481,27 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
metrics, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}

dataPointCount := metrics.DataPointCount()
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
if !c.autocommitEnabled {
session.Commit()
}
}
return nil
}
Expand All @@ -459,12 +527,17 @@ func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession)

func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
if !c.autocommitEnabled {
defer session.Commit()
}
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")
if !c.messageMarking.After {
session.MarkMessage(message, "")
}

ctx := c.obsrecv.StartTracesOp(session.Context())
_ = stats.RecordWithTags(
Expand All @@ -477,15 +550,27 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
logs, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
if !c.autocommitEnabled {
session.Commit()
}
}
return nil
}
Loading

0 comments on commit 91b03e6

Please sign in to comment.