diff --git a/.chloggen/fix-kafka-recv-blocking-shutdown.yaml b/.chloggen/fix-kafka-recv-blocking-shutdown.yaml new file mode 100644 index 000000000000..d48a75cfe954 --- /dev/null +++ b/.chloggen/fix-kafka-recv-blocking-shutdown.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixes issue causing kafkareceiver to block during Shutdown(). + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30789] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index d14e2e8f35e1..6da4d67c9d37 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -17,7 +17,6 @@ require ( github.com/openzipkin/zipkin-go v0.4.3 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae - go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/config/configtelemetry v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/config/configtls v1.17.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/confmap v1.17.1-0.20241008154146-ea48c09c31ae diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 72c2336349c1..92dca05b53e2 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -126,8 +126,6 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t go.opentelemetry.io/collector v0.111.0 h1:D3LJTYrrK2ac94E2PXPSbVkArqxbklbCLsE4MAJQdRo= go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae h1:dXAMqXGJp1vWG7qwS/2sjIyJgmyOSfEOm6Gcmkzp1cQ= go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae/go.mod h1:iWUfPxpVwZhkI4v3/Gh5wt4iKyJn4lriPFAug8iLXno= -go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae h1:BVTz/s8fmI5UA4Q6zAndl0Pds4RrkhxEXkx9TMelleM= -go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae/go.mod h1:fwY2NdXkOw07ObusogFZChVyyvqXqCJBlGaWwwDJAtI= go.opentelemetry.io/collector/config/configopaque v1.17.1-0.20241008154146-ea48c09c31ae h1:Mh1ZBO6U5X8iXGFBTguBvLBydg+aLuoWX0Ij7QzHU3c= go.opentelemetry.io/collector/config/configopaque v1.17.1-0.20241008154146-ea48c09c31ae/go.mod h1:6zlLIyOoRpJJ+0bEKrlZOZon3rOp5Jrz9fMdR4twOS4= go.opentelemetry.io/collector/config/configretry v1.17.1-0.20241008154146-ea48c09c31ae h1:2iWFdlGM2sRZd2WBrivau/cBZzc5iZMgHFcheoh1xvM= diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 750955366816..48ea87559a56 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -5,14 +5,12 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" - "errors" "fmt" "strconv" "sync" "github.com/IBM/sarama" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -44,6 +42,7 @@ type kafkaTracesConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler TracesUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -65,6 +64,7 @@ type kafkaMetricsConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler MetricsUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -86,6 +86,7 @@ type kafkaLogsConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler LogsUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -113,6 +114,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -207,16 +209,14 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, consumerGroup) <-consumerGroup.ready return nil } -func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -227,7 +227,7 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -237,6 +237,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } @@ -253,6 +254,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consu config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -315,16 +317,14 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, metricsConsumerGroup) <-metricsConsumerGroup.ready return nil } -func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -335,7 +335,7 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -345,6 +345,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } @@ -361,6 +362,7 @@ func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -426,16 +428,14 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, logsConsumerGroup) <-logsConsumerGroup.ready return nil } -func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -446,7 +446,7 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -456,6 +456,7 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 529196efe26e..61693d58eced 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -96,6 +96,7 @@ func TestTracesReceiverStart(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: nopTelemetryBuilder(t), @@ -110,6 +111,7 @@ func TestTracesReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -117,11 +119,11 @@ func TestTracesReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &tracesConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestTracesReceiver_error(t *testing.T) { @@ -134,6 +136,7 @@ func TestTracesReceiver_error(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -375,6 +378,7 @@ func TestTracesReceiver_encoding_extension(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: "traces_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -449,6 +453,7 @@ func TestMetricsReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaMetricsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -456,11 +461,11 @@ func TestMetricsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &logsConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestMetricsReceiver_error(t *testing.T) { @@ -473,6 +478,7 @@ func TestMetricsReceiver_error(t *testing.T) { c := kafkaMetricsConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -712,6 +718,7 @@ func TestMetricsReceiver_encoding_extension(t *testing.T) { c := kafkaMetricsConsumer{ config: Config{Encoding: "metrics_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -787,6 +794,7 @@ func TestLogsReceiverStart(t *testing.T) { c := kafkaLogsConsumer{ config: *createDefaultConfig().(*Config), nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: nopTelemetryBuilder(t), @@ -801,6 +809,7 @@ func TestLogsReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaLogsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -808,11 +817,11 @@ func TestLogsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &logsConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestLogsReceiver_error(t *testing.T) { @@ -824,6 +833,7 @@ func TestLogsReceiver_error(t *testing.T) { expectedErr := errors.New("handler error") c := kafkaLogsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, config: *createDefaultConfig().(*Config), @@ -1188,6 +1198,7 @@ func TestLogsReceiver_encoding_extension(t *testing.T) { c := kafkaLogsConsumer{ config: Config{Encoding: "logs_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t),