Skip to content

Commit

Permalink
[receiver/kafkareceiver] fix: Kafka receiver blocking shutdown (#35767)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Fixes an issue where the Kafka receiver would block on shutdown.

There was an earlier fix for this issue
[here](#32720).
This does solve the issue, but it was only applied to the traces
receiver, not the logs or metrics receiver.

The issue is this go routine in the `Start()` functions for logs and
metrics:
```go
go func() {
        if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

The `consumeLoop()` function returns a `context.Canceled` error when
`Shutdown()` is called, which is expected. However
`componentstatus.ReportStatus()` blocks while attempting to report this
error. The reason/bug for this can be found
[here](open-telemetry/opentelemetry-collector#9824).

The previously mentioned PR fixed this for the traces receiver by
checking if the error returned by `consumeLoop()` is `context.Canceled`:
```go
go func() {
	if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

Additionally, this is `consumeLoop()` for the traces receiver, with the
logs and metrics versions being identical:
```go
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
	for {
		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
			c.settings.Logger.Error("Error from consumer", zap.Error(err))
		}
		// check if context was cancelled, signaling that the consumer should stop
		if ctx.Err() != nil {
			c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
			return ctx.Err()
		}
	}
}
```

This does fix the issue, however the only error that can be returned by
`consumeLoop()` is a canceled context. When we create the context and
cancel function, we use `context.Background()`:
```go
ctx, cancel := context.WithCancel(context.Background())
```
This context is only used by `consumeLoop()` and the cancel function is
only called in `Shutdown()`.

Because `consumeLoop()` can only return a `context.Canceled` error, this
PR removes this unused code for the logs, metrics, and traces receivers.
Instead, `consumeLoop()` still logs the `context.Canceled` error but it
does not return any error and the go routine simply just calls
`consumeLoop()`.

Additional motivation for removing the call to
`componentstatus.ReportStatus()` is the underlying function called by
it, `componentstatus.Report()` says it does not need to be called during
`Shutdown()` or `Start()` as the service already does so for the given
component, [comment
here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25).
Even if there wasn't a bug causing this call to block, the component
still shouldn't call it since it would only be called during
`Shutdown()`.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #30789

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Tested in a build of the collector with these changes scraping logs from
a Kafka instance. When the collector is stopped and `Shutdown()` gets
called, the receiver did not block and the collector stopped gracefully
as expected.
  • Loading branch information
dpaasman00 authored Oct 22, 2024
1 parent 8d9f682 commit 58a77db
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 32 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-kafka-recv-blocking-shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixes issue causing kafkareceiver to block during Shutdown().

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30789]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 0 additions & 1 deletion receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/openzipkin/zipkin-go v0.4.3
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae
go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae
go.opentelemetry.io/collector/config/configtelemetry v0.111.1-0.20241008154146-ea48c09c31ae
go.opentelemetry.io/collector/config/configtls v1.17.1-0.20241008154146-ea48c09c31ae
go.opentelemetry.io/collector/confmap v1.17.1-0.20241008154146-ea48c09c31ae
Expand Down
2 changes: 0 additions & 2 deletions receiver/kafkareceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 24 additions & 23 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -44,6 +42,7 @@ type kafkaTracesConsumer struct {
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaler TracesUnmarshaler
consumeLoopWG *sync.WaitGroup

settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder
Expand All @@ -65,6 +64,7 @@ type kafkaMetricsConsumer struct {
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaler MetricsUnmarshaler
consumeLoopWG *sync.WaitGroup

settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder
Expand All @@ -86,6 +86,7 @@ type kafkaLogsConsumer struct {
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaler LogsUnmarshaler
consumeLoopWG *sync.WaitGroup

settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder
Expand Down Expand Up @@ -113,6 +114,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
consumeLoopWG: &sync.WaitGroup{},
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
Expand Down Expand Up @@ -207,16 +209,14 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
headers: c.headers,
}
}
go func() {
if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
c.consumeLoopWG.Add(1)
go c.consumeLoop(ctx, consumerGroup)
<-consumerGroup.ready
return nil
}

func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.consumeLoopWG.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -227,7 +227,7 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
return
}
}
}
Expand All @@ -237,6 +237,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
return nil
}
c.cancelConsumeLoop()
c.consumeLoopWG.Wait()
if c.consumerGroup == nil {
return nil
}
Expand All @@ -253,6 +254,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consu
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
consumeLoopWG: &sync.WaitGroup{},
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
Expand Down Expand Up @@ -315,16 +317,14 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
headers: c.headers,
}
}
go func() {
if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
c.consumeLoopWG.Add(1)
go c.consumeLoop(ctx, metricsConsumerGroup)
<-metricsConsumerGroup.ready
return nil
}

func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.consumeLoopWG.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -335,7 +335,7 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
return
}
}
}
Expand All @@ -345,6 +345,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
return nil
}
c.cancelConsumeLoop()
c.consumeLoopWG.Wait()
if c.consumerGroup == nil {
return nil
}
Expand All @@ -361,6 +362,7 @@ func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
consumeLoopWG: &sync.WaitGroup{},
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
Expand Down Expand Up @@ -426,16 +428,14 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
headers: c.headers,
}
}
go func() {
if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
c.consumeLoopWG.Add(1)
go c.consumeLoop(ctx, logsConsumerGroup)
<-logsConsumerGroup.ready
return nil
}

func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.consumeLoopWG.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -446,7 +446,7 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
return
}
}
}
Expand All @@ -456,6 +456,7 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
return nil
}
c.cancelConsumeLoop()
c.consumeLoopWG.Wait()
if c.consumerGroup == nil {
return nil
}
Expand Down
23 changes: 17 additions & 6 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestTracesReceiverStart(t *testing.T) {
c := kafkaTracesConsumer{
config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
telemetryBuilder: nopTelemetryBuilder(t),
Expand All @@ -110,18 +111,19 @@ func TestTracesReceiverStartConsume(t *testing.T) {
require.NoError(t, err)
c := kafkaTracesConsumer{
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
telemetryBuilder: telemetryBuilder,
}
ctx, cancelFunc := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancelFunc
require.NoError(t, c.Shutdown(context.Background()))
err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{
c.consumeLoopWG.Add(1)
c.consumeLoop(ctx, &tracesConsumerGroupHandler{
ready: make(chan bool),
telemetryBuilder: telemetryBuilder,
})
assert.EqualError(t, err, context.Canceled.Error())
}

func TestTracesReceiver_error(t *testing.T) {
Expand All @@ -134,6 +136,7 @@ func TestTracesReceiver_error(t *testing.T) {
c := kafkaTracesConsumer{
config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
telemetryBuilder: nopTelemetryBuilder(t),
Expand Down Expand Up @@ -375,6 +378,7 @@ func TestTracesReceiver_encoding_extension(t *testing.T) {
c := kafkaTracesConsumer{
config: Config{Encoding: "traces_encoding"},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
telemetryBuilder: nopTelemetryBuilder(t),
Expand Down Expand Up @@ -449,18 +453,19 @@ func TestMetricsReceiverStartConsume(t *testing.T) {
require.NoError(t, err)
c := kafkaMetricsConsumer{
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
telemetryBuilder: telemetryBuilder,
}
ctx, cancelFunc := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancelFunc
require.NoError(t, c.Shutdown(context.Background()))
err = c.consumeLoop(ctx, &logsConsumerGroupHandler{
c.consumeLoopWG.Add(1)
c.consumeLoop(ctx, &logsConsumerGroupHandler{
ready: make(chan bool),
telemetryBuilder: telemetryBuilder,
})
assert.EqualError(t, err, context.Canceled.Error())
}

func TestMetricsReceiver_error(t *testing.T) {
Expand All @@ -473,6 +478,7 @@ func TestMetricsReceiver_error(t *testing.T) {
c := kafkaMetricsConsumer{
config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
telemetryBuilder: nopTelemetryBuilder(t),
Expand Down Expand Up @@ -712,6 +718,7 @@ func TestMetricsReceiver_encoding_extension(t *testing.T) {
c := kafkaMetricsConsumer{
config: Config{Encoding: "metrics_encoding"},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
telemetryBuilder: nopTelemetryBuilder(t),
Expand Down Expand Up @@ -787,6 +794,7 @@ func TestLogsReceiverStart(t *testing.T) {
c := kafkaLogsConsumer{
config: *createDefaultConfig().(*Config),
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
telemetryBuilder: nopTelemetryBuilder(t),
Expand All @@ -801,18 +809,19 @@ func TestLogsReceiverStartConsume(t *testing.T) {
require.NoError(t, err)
c := kafkaLogsConsumer{
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
telemetryBuilder: telemetryBuilder,
}
ctx, cancelFunc := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancelFunc
require.NoError(t, c.Shutdown(context.Background()))
err = c.consumeLoop(ctx, &logsConsumerGroupHandler{
c.consumeLoopWG.Add(1)
c.consumeLoop(ctx, &logsConsumerGroupHandler{
ready: make(chan bool),
telemetryBuilder: telemetryBuilder,
})
assert.EqualError(t, err, context.Canceled.Error())
}

func TestLogsReceiver_error(t *testing.T) {
Expand All @@ -824,6 +833,7 @@ func TestLogsReceiver_error(t *testing.T) {
expectedErr := errors.New("handler error")
c := kafkaLogsConsumer{
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
config: *createDefaultConfig().(*Config),
Expand Down Expand Up @@ -1188,6 +1198,7 @@ func TestLogsReceiver_encoding_extension(t *testing.T) {
c := kafkaLogsConsumer{
config: Config{Encoding: "logs_encoding"},
nextConsumer: consumertest.NewNop(),
consumeLoopWG: &sync.WaitGroup{},
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
telemetryBuilder: nopTelemetryBuilder(t),
Expand Down

0 comments on commit 58a77db

Please sign in to comment.