Skip to content

Commit

Permalink
fix: broker new consumer group run
Browse files Browse the repository at this point in the history
  • Loading branch information
daheige committed Mar 16, 2022
1 parent 3965ac2 commit 951cee6
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
4 changes: 3 additions & 1 deletion gkafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gkafka
import (
"context"
"errors"
"fmt"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -87,8 +88,9 @@ func (k *kafkaImpl) Subscribe(ctx context.Context, topic string, groupID string,

consumerGroup, err := sarama.NewConsumerGroupFromClient(opt.Name, k.client)
if err != nil {
return err
panic(fmt.Errorf("new kafka consumer name:%s err:%s", opt.Name, err.Error()))
}

defer func() {
_ = consumerGroup.Close()
}()
Expand Down
4 changes: 3 additions & 1 deletion gpulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gpulsar
import (
"context"
"errors"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -119,8 +120,9 @@ func (p *pulsarImpl) Subscribe(ctx context.Context, topic string, channel string
}

if err != nil {
return err
panic(fmt.Errorf("new pulsar consumer name:%s err:%s", opt.Name, err.Error()))
}

defer consumer.Close()

done := make(chan struct{}, opt.ConcurrencySize)
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ https://pulsar.apache.org/docs/zh-CN/client-libraries-go/

For specific usage, refer to gpulsar/gredis test
kafka consumer groups require Version to be >= V0_10_2_0
if lower than V0_10_2_0, please use go-god/broker v1.1.1
if lower than V0_10_2_0, please use go-god/broker v1.1.2

0 comments on commit 951cee6

Please sign in to comment.