diff --git a/broker.go b/broker.go index 6fbff47..9995cdd 100644 --- a/broker.go +++ b/broker.go @@ -120,15 +120,17 @@ func (b *Broker) Serve() error { } func (b *Broker) startNodes(ctx context.Context) error { - for _, c := range b.EventMux.List() { - nodeCtx := ctx - n := newNode(b, c) - if err := n.Consume(nodeCtx); err != nil { - return err + for _, consumers := range b.EventMux.List() { + for _, c := range consumers { + nodeCtx := ctx + n := newNode(b, c) + if err := n.Consume(nodeCtx); err != nil { + return err + } + b.nodes[b.runningNodes] = n + b.runningWorkers += n.runningWorkers.Length() + b.runningNodes++ } - b.nodes[b.runningNodes] = n - b.runningWorkers += n.runningWorkers.Length() - b.runningNodes++ } return nil } diff --git a/mux.go b/mux.go index fb96e7e..5ab30d4 100644 --- a/mux.go +++ b/mux.go @@ -12,24 +12,24 @@ type EventMux interface { // Add manually adds a consumer Add(c *Consumer) // Get returns an specific consumer by the given topic - Get(topic string) *Consumer + Get(topic string) []*Consumer // Del removes an specific topic from the local registry Del(topic string) // Contains verifies if the given topic exists in the local registry Contains(topic string) bool // List returns the local registry - List() map[string]*Consumer + List() map[string][]*Consumer } type defaultMux struct { - consumers map[string]*Consumer + consumers map[string][]*Consumer mu sync.RWMutex } // NewMux allocates and returns a default EventMux func NewMux() EventMux { return &defaultMux{ - consumers: map[string]*Consumer{}, + consumers: map[string][]*Consumer{}, mu: sync.RWMutex{}, } } @@ -42,7 +42,7 @@ func (b *defaultMux) Topic(topic string) *Consumer { return c } c.Topic(topic) - b.consumers[topic] = c + b.consumers[topic] = append(b.consumers[topic], c) return c } @@ -57,7 +57,7 @@ func (b *defaultMux) Topics(topics ...string) *Consumer { // b.consumers.Store(c.TopicString(), c) - previous version, attached many topics into a single unit of work // Adds a worker pool per-topic for _, t := range topics { - b.consumers[t] = c + b.consumers[t] = append(b.consumers[t], c) } return c } @@ -69,11 +69,11 @@ func (b *defaultMux) Add(c *Consumer) { return // ignore nil-refs } for _, t := range c.topics { - b.consumers[t] = c + b.consumers[t] = append(b.consumers[t], c) } } -func (b *defaultMux) Get(topic string) *Consumer { +func (b *defaultMux) Get(topic string) []*Consumer { b.mu.RLock() defer b.mu.RUnlock() c, ok := b.consumers[topic] @@ -97,7 +97,7 @@ func (b *defaultMux) Contains(topic string) bool { return ok } -func (b *defaultMux) List() map[string]*Consumer { +func (b *defaultMux) List() map[string][]*Consumer { b.mu.RLock() defer b.mu.RUnlock() return b.consumers diff --git a/worker_test.go b/worker_test.go index 3768c35..08155e6 100644 --- a/worker_test.go +++ b/worker_test.go @@ -29,8 +29,10 @@ func TestWorker(t *testing.T) { assert.Nil(t, err) b.Topic("foo.1") var n *node - for _, c := range b.EventMux.List() { - n = newNode(b, c) + for _, consumers := range b.EventMux.List() { + for _, c := range consumers { + n = newNode(b, c) + } } assert.NotNil(t, n) n.Consumer.Provider(tt.n) @@ -57,8 +59,10 @@ func BenchmarkWorker(b *testing.B) { } br.Topic("foo.1") var n *node - for _, c := range br.EventMux.List() { - n = newNode(br, c) + for _, consumers := range br.EventMux.List() { + for _, c := range consumers { + n = newNode(br, c) + } } for i := 0; i < b.N; i++ { w := newWorker(n)