Skip to content
This repository has been archived by the owner on Oct 25, 2021. It is now read-only.

Commit

Permalink
Fix n-consumers per topic
Browse files Browse the repository at this point in the history
  • Loading branch information
maestre3d committed Dec 31, 2020
1 parent 2dc29e0 commit 65c6822
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
18 changes: 10 additions & 8 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ func (b *Broker) Serve() error {
}

func (b *Broker) startNodes(ctx context.Context) error {
for _, c := range b.EventMux.List() {
nodeCtx := ctx
n := newNode(b, c)
if err := n.Consume(nodeCtx); err != nil {
return err
for _, consumers := range b.EventMux.List() {
for _, c := range consumers {
nodeCtx := ctx
n := newNode(b, c)
if err := n.Consume(nodeCtx); err != nil {
return err
}
b.nodes[b.runningNodes] = n
b.runningWorkers += n.runningWorkers.Length()
b.runningNodes++
}
b.nodes[b.runningNodes] = n
b.runningWorkers += n.runningWorkers.Length()
b.runningNodes++
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ type EventMux interface {
// Add manually adds a consumer
Add(c *Consumer)
// Get returns an specific consumer by the given topic
Get(topic string) *Consumer
Get(topic string) []*Consumer
// Del removes an specific topic from the local registry
Del(topic string)
// Contains verifies if the given topic exists in the local registry
Contains(topic string) bool
// List returns the local registry
List() map[string]*Consumer
List() map[string][]*Consumer
}

type defaultMux struct {
consumers map[string]*Consumer
consumers map[string][]*Consumer
mu sync.RWMutex
}

// NewMux allocates and returns a default EventMux
func NewMux() EventMux {
return &defaultMux{
consumers: map[string]*Consumer{},
consumers: map[string][]*Consumer{},
mu: sync.RWMutex{},
}
}
Expand All @@ -42,7 +42,7 @@ func (b *defaultMux) Topic(topic string) *Consumer {
return c
}
c.Topic(topic)
b.consumers[topic] = c
b.consumers[topic] = append(b.consumers[topic], c)
return c
}

Expand All @@ -57,7 +57,7 @@ func (b *defaultMux) Topics(topics ...string) *Consumer {
// b.consumers.Store(c.TopicString(), c) - previous version, attached many topics into a single unit of work
// Adds a worker pool per-topic
for _, t := range topics {
b.consumers[t] = c
b.consumers[t] = append(b.consumers[t], c)
}
return c
}
Expand All @@ -69,11 +69,11 @@ func (b *defaultMux) Add(c *Consumer) {
return // ignore nil-refs
}
for _, t := range c.topics {
b.consumers[t] = c
b.consumers[t] = append(b.consumers[t], c)
}
}

func (b *defaultMux) Get(topic string) *Consumer {
func (b *defaultMux) Get(topic string) []*Consumer {
b.mu.RLock()
defer b.mu.RUnlock()
c, ok := b.consumers[topic]
Expand All @@ -97,7 +97,7 @@ func (b *defaultMux) Contains(topic string) bool {
return ok
}

func (b *defaultMux) List() map[string]*Consumer {
func (b *defaultMux) List() map[string][]*Consumer {
b.mu.RLock()
defer b.mu.RUnlock()
return b.consumers
Expand Down
12 changes: 8 additions & 4 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ func TestWorker(t *testing.T) {
assert.Nil(t, err)
b.Topic("foo.1")
var n *node
for _, c := range b.EventMux.List() {
n = newNode(b, c)
for _, consumers := range b.EventMux.List() {
for _, c := range consumers {
n = newNode(b, c)
}
}
assert.NotNil(t, n)
n.Consumer.Provider(tt.n)
Expand All @@ -57,8 +59,10 @@ func BenchmarkWorker(b *testing.B) {
}
br.Topic("foo.1")
var n *node
for _, c := range br.EventMux.List() {
n = newNode(br, c)
for _, consumers := range br.EventMux.List() {
for _, c := range consumers {
n = newNode(br, c)
}
}
for i := 0; i < b.N; i++ {
w := newWorker(n)
Expand Down

0 comments on commit 65c6822

Please sign in to comment.