diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 329c2114193..7d9080b68df 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -75,7 +75,9 @@ func New(params Params) (*Consumer, error) { // Start begins consuming messages in a go routine func (c *Consumer) Start() { c.deadlockDetector.start() + c.doneWg.Add(1) go func() { + defer c.doneWg.Done() c.logger.Info("Starting main loop") for pc := range c.internalConsumer.Partitions() { c.partitionMapLock.Lock() diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 731218b4b19..d6f06eed347 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -79,6 +79,7 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer, saramaClusterConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha)) saramaClusterConsumer.On("Close").Return(nil).Run(func(args mock.Arguments) { mc.Close() + close(pcha) }) saramaClusterConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) return saramaClusterConsumer