Skip to content

Commit 5ce1f52

Browse files
committed
simplify SaramaConsumerGroup.Start method
I believe there was confusion about how this is called or should run. These changes provide equivalent functionality, while being easier to read and understand. There are instructions from the library authors here: https://github.com/IBM/sarama/blob/v1.38.1/consumer_group.go#L19 I think there was confusion around the steps detailed in the link above. So let me clarify my understanding: - Canceling the context passed to Consume is how the implementor of the sarama.ConsumerGroupHandler tells Consume to return. In other words, when we're shutting down, we cancel the context to get Consume to return. I base this off of #4 in the link. - No goroutine needs to be launched on our side, the sarama library will launch the goroutines and those goroutines will call our implementation of ConsumeClaim(). This comes from #3. - The warnings about thread-safety apply to our implementation of ConsumeClaim(). In other words, we need to be careful that s.consumer isn't nil, but since nothing in our code modifies s.consumer after instantiation, we're fine. This comes from #3. - Since our own events.EventConsumer has to be able to respond to Stop() calls, we need to be able to break the for-loop in Start(), which is done by canceling the context that we pass to Consume(). The context.CancelFunc is guarded with a mutex for for extra thread-safety, so that if Stop() is called from multiple goroutines, we don't have any race conditions to worry about. We can reason that the goroutine being previously launched necessary, as there was only a single goroutine being launched, and same function that launched it then blocked, waiting for it to return before continuing. Without the goroutine, the WaitGroup and stop channel and stopOnce are no longer required.
1 parent a36986e commit 5ce1f52

File tree

1 file changed

+32
-59
lines changed

1 file changed

+32
-59
lines changed

events/consumer_group.go

Lines changed: 32 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package events
22

33
import (
44
"context"
5-
"fmt"
65
"log"
76
"sync"
87

@@ -17,10 +16,10 @@ type SaramaConsumerGroup struct {
1716
config *CloudEventsConfig
1817
consumerGroup sarama.ConsumerGroup
1918
consumer MessageConsumer
20-
stop chan struct{}
21-
stopOnce *sync.Once
2219
topic string
23-
wg *sync.WaitGroup
20+
21+
cancelFuncMu sync.Mutex
22+
cancelFunc context.CancelFunc
2423
}
2524

2625
func NewSaramaConsumerGroup(config *CloudEventsConfig, consumer MessageConsumer) (EventConsumer, error) {
@@ -32,9 +31,6 @@ func NewSaramaConsumerGroup(config *CloudEventsConfig, consumer MessageConsumer)
3231
config: config,
3332
consumer: consumer,
3433
topic: config.GetPrefixedTopic(),
35-
wg: &sync.WaitGroup{},
36-
stop: make(chan struct{}),
37-
stopOnce: &sync.Once{},
3834
}, nil
3935
}
4036

@@ -70,66 +66,43 @@ func (s *SaramaConsumerGroup) Start() error {
7066
return err
7167
}
7268

73-
ctx, cancel := context.WithCancel(context.Background())
74-
go func() {
75-
<-s.stop
76-
cancel()
77-
}()
78-
79-
errChan := make(chan error)
80-
s.wg.Add(1)
81-
go func() {
82-
defer s.wg.Done()
83-
for {
84-
// `Consume` should be called inside an infinite loop, when a
85-
// server-side rebalance happens, the consumer session will need to be
86-
// recreated to get the new claims
87-
if err := s.consumerGroup.Consume(ctx, []string{s.topic}, s); err != nil {
88-
log.Printf("Error from consumer: %v", err)
89-
// It's not clear whether this condition can be true
90-
if err == context.Canceled {
91-
err = ErrConsumerStopped
92-
}
93-
errChan <- err
94-
return
95-
}
96-
// check if context was cancelled, signaling that the consumer should stop
97-
if ctx.Err() != nil {
98-
errChan <- ErrConsumerStopped
99-
return
69+
ctx, cancel := s.newContext()
70+
defer cancel()
71+
72+
for {
73+
// `Consume` should be called inside an infinite loop, when a
74+
// server-side rebalance happens, the consumer session will need to be
75+
// recreated to get the new claims
76+
if err := s.consumerGroup.Consume(ctx, []string{s.topic}, s); err != nil {
77+
log.Printf("Error from consumer: %v", err)
78+
if err == context.Canceled {
79+
return ErrConsumerStopped
10080
}
81+
return err
10182
}
102-
}()
103-
104-
err := <-errChan
105-
if err == ErrConsumerStopped {
106-
return err
107-
}
108-
109-
// The consumer group was terminated with an unexpected error.
110-
// We need to call stop, so we cancel the context and stop the
111-
// go routine so it doesn't leak on restart.
112-
if e := s.Stop(); e != nil {
113-
err = fmt.Errorf("%w: %s", err, e.Error())
11483
}
115-
116-
return err
11784
}
11885

11986
func (s *SaramaConsumerGroup) Stop() error {
120-
// Initialization failed
121-
if s.consumerGroup == nil {
122-
return nil
123-
}
87+
s.cancelFuncMu.Lock()
88+
defer s.cancelFuncMu.Unlock()
12489

125-
// Signal that the consumer group should be terminated
126-
s.stopOnce.Do(func() {
127-
s.stop <- struct{}{}
128-
})
90+
if s.cancelFunc != nil {
91+
s.cancelFunc()
92+
s.cancelFunc = nil
93+
}
94+
return nil
95+
}
12996

130-
// Wait for the consumer group to exit
131-
s.wg.Wait()
132-
return s.consumerGroup.Close()
97+
func (s *SaramaConsumerGroup) newContext() (context.Context, context.CancelFunc) {
98+
s.cancelFuncMu.Lock()
99+
defer s.cancelFuncMu.Unlock()
100+
if s.cancelFunc != nil {
101+
s.cancelFunc()
102+
}
103+
ctx, cancel := context.WithCancel(context.Background())
104+
s.cancelFunc = cancel
105+
return ctx, cancel
133106
}
134107

135108
func (s *SaramaConsumerGroup) initialize() error {

0 commit comments

Comments
 (0)