Skip to content

Commit

Permalink
Add multi topic consumer. (apache#100)
Browse files Browse the repository at this point in the history
* Add multi topic consumer.

* Remove unused lock.
  • Loading branch information
cckellogg authored and merlimat committed Nov 15, 2019
1 parent ccd4547 commit 55b9a6f
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 11 deletions.
48 changes: 42 additions & 6 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ import (
log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
)

var ErrConsumerClosed = errors.New("consumer closed")

const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
AckID(id *messageID)
NackID(id *messageID)
}

type consumer struct {
options ConsumerOptions

Expand Down Expand Up @@ -75,18 +79,26 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
topic = options.Topics[0]
}

if _, err := internal.ParseTopicName(topic); err != nil {
if err := validateTopicNames(topic); err != nil {
return nil, err
}

return topicSubscribe(client, options, topic, messageCh)
}

if len(options.Topics) > 1 {
if err := validateTopicNames(options.Topics...); err != nil {
return nil, err
}

return newMultiTopicConsumer(client, options, options.Topics, messageCh)
}

return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (Consumer, error) {
func internalTopicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (*consumer, error) {
consumer := &consumer{
messageCh: messageCh,
errorCh: make(chan error),
Expand Down Expand Up @@ -172,6 +184,11 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string,
return consumer, nil
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (Consumer, error) {
return internalTopicSubscribe(client, options, topic, messageCh)
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
Expand Down Expand Up @@ -221,6 +238,11 @@ func (c *consumer) AckID(msgID MessageID) {
return
}

if mid.consumer != nil {
mid.Ack()
return
}

partition := mid.partitionIdx
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
Expand All @@ -242,6 +264,11 @@ func (c *consumer) NackID(msgID MessageID) {
return
}

if mid.consumer != nil {
mid.Nack()
return
}

partition := mid.partitionIdx
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
Expand All @@ -265,13 +292,22 @@ func (c *consumer) Close() {
wg.Wait()
}

var random = rand.New(rand.NewSource(time.Now().UnixNano()))
var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}

type random struct {
sync.Mutex
R *rand.Rand
}

func generateRandomName() string {
r.Lock()
defer r.Unlock()
chars := "abcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, 5)
for i := range bytes {
bytes[i] = chars[random.Intn(len(chars))]
bytes[i] = chars[r.R.Intn(len(chars))]
}
return string(bytes)
}
Expand Down
189 changes: 189 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"fmt"
"sync"

pkgerrors "github.com/pkg/errors"

log "github.com/sirupsen/logrus"
)

type multiTopicConsumer struct {
options ConsumerOptions

messageCh chan ConsumerMessage

consumers map[string]*consumer

closeOnce sync.Once
closeCh chan struct{}

log *log.Entry
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage) (Consumer, error) {
mtc := &multiTopicConsumer{
options: options,
messageCh: messageCh,
consumers: make(map[string]*consumer, len(topics)),
closeCh: make(chan struct{}),
log: &log.Entry{},
}

type ConsumerError struct {
err error
topic string
consumer *consumer
}

var wg sync.WaitGroup
wg.Add(len(topics))
ch := make(chan ConsumerError, len(topics))
for i := range topics {
go func(t string) {
defer wg.Done()
c, err := internalTopicSubscribe(client, options, t, messageCh)
ch <- ConsumerError{
err: err,
topic: t,
consumer: c,
}
}(topics[i])
}

go func() {
wg.Wait()
close(ch)
}()

var errs error
for ce := range ch {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s subscription=%s",
ce.topic, options.SubscriptionName)
} else {
mtc.consumers[ce.topic] = ce.consumer
}
}

if errs != nil {
for _, c := range mtc.consumers {
c.Close()
}
return nil, errs
}

return mtc, nil
}

func (c *multiTopicConsumer) Subscription() string {
return c.options.SubscriptionName
}

func (c *multiTopicConsumer) Unsubscribe() error {
var errs error
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
msg := fmt.Sprintf("unable to unsubscribe from topic=%s subscription=%s",
consumer.options.Topic, c.Subscription())
errs = pkgerrors.Wrap(err, msg)
}
}
return errs
}

func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, ErrConsumerClosed
case cm, ok := <-c.messageCh:
if !ok {
return nil, ErrConsumerClosed
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

// Messages
func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}

// Ack the consumption of a single message
func (c *multiTopicConsumer) Ack(msg Message) {
c.AckID(msg.ID())
}

// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(*messageID)
if !ok {
c.log.Warnf("invalid message id type")
return
}

if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID)
return
}

mid.Ack()
}

func (c *multiTopicConsumer) Nack(msg Message) {
c.AckID(msg.ID())
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(*messageID)
if !ok {
c.log.Warnf("invalid message id type")
return
}

if mid.consumer == nil {
c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID)
return
}

mid.Nack()
}

func (c *multiTopicConsumer) Close() {
c.closeOnce.Do(func() {
var wg sync.WaitGroup
wg.Add(len(c.consumers))
for _, con := range c.consumers {
go func(consumer *consumer) {
defer wg.Done()
consumer.Close()
}(con)
}
wg.Wait()
close(c.closeCh)
})
}
93 changes: 93 additions & 0 deletions pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMultiTopicConsumerReceive(t *testing.T) {
topic1 := newTopicName()
topic2 := newTopicName()

client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
t.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

// produce messages
for i, topic := range topics {
if err := produceHelloMessages(client, topic, 5, fmt.Sprintf("topic-%d", i+1)); err != nil {
t.Fatal(err)
}
}

receivedTopic1 := 0
receivedTopic2 := 0
for receivedTopic1+receivedTopic2 < 10 {
select {
case cm := <-consumer.Chan():
msg := string(cm.Payload())
if strings.HasPrefix(msg, "topic-1") {
receivedTopic1++
} else if strings.HasPrefix(msg, "topic-2") {
receivedTopic2++
}
consumer.Ack(cm.Message)
}
}
assert.Equal(t, receivedTopic1, receivedTopic2)
}

func produceHelloMessages(client Client, topic string, numMessages int, prefix string) error {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
return err
}
defer p.Close()
ctx := context.Background()
for i := 0; i < numMessages; i++ {
m := &ProducerMessage{
Payload: []byte(fmt.Sprintf("%s-hello-%d", prefix, i)),
}
if err := p.Send(ctx, m); err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 55b9a6f

Please sign in to comment.