Skip to content

Commit

Permalink
Inject error chan into StartConsuming()
Browse files Browse the repository at this point in the history
  • Loading branch information
wellle committed May 23, 2020
1 parent 80b5328 commit 0c4be35
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 47 deletions.
6 changes: 3 additions & 3 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
count, err = queue.unackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, int64(0))
c.Check(queue.StartConsuming(2, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(2, time.Millisecond, nil), IsNil)
time.Sleep(time.Millisecond)
count, err = queue.unackedCount()
c.Check(err, IsNil)
Expand Down Expand Up @@ -145,7 +145,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
count, err = queue.unackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, int64(0))
c.Check(queue.StartConsuming(2, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(2, time.Millisecond, nil), IsNil)
time.Sleep(time.Millisecond)
count, err = queue.unackedCount()
c.Check(err, IsNil)
Expand Down Expand Up @@ -209,7 +209,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
c.Check(err, IsNil)
queue, err = conn.OpenQueue("q1")
c.Check(err, IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
consumer = NewTestConsumer("c-C")

_, err = queue.AddConsumer("consumer3", consumer)
Expand Down
9 changes: 9 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package rmq

type ConsumeError struct {
RedisErr error
}

func (e *ConsumeError) Error() string {
return "rmq.ConsumeError: " + e.RedisErr.Error()
}
4 changes: 2 additions & 2 deletions example/batch_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
if err != nil {
panic(err)
}
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond); err != nil {
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond, nil); err != nil {
panic(err)
}
if _, err := queue.AddBatchConsumer("things", 111, NewBatchConsumer("things")); err != nil {
Expand All @@ -30,7 +30,7 @@ func main() {
if err != nil {
panic(err)
}
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond); err != nil {
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond, nil); err != nil {
panic(err)
}
if _, err := queue.AddBatchConsumer("balls", 111, NewBatchConsumer("balls")); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ func main() {
if err != nil {
panic(err)
}
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond); err != nil {

errors := make(chan error, 10)
go func() {
for err := range errors {
log.Print("error: ", err)
}
}()
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond, errors); err != nil {
panic(err)
}

for i := 0; i < numConsumers; i++ {
name := fmt.Sprintf("consumer %d", i)
if _, err := queue.AddConsumer(name, NewConsumer(i)); err != nil {
Expand Down
16 changes: 8 additions & 8 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Queue interface {
Publish(payload ...string) error
PublishBytes(payload ...[]byte) error
SetPushQueue(pushQueue Queue)
StartConsuming(prefetchLimit int64, pollDuration time.Duration) error
StartConsuming(prefetchLimit int64, pollDuration time.Duration, errors chan<- error) error
StopConsuming() <-chan struct{}
AddConsumer(tag string, consumer Consumer) (string, error)
AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error)
Expand Down Expand Up @@ -221,7 +221,7 @@ func (queue *redisQueue) SetPushQueue(pushQueue Queue) {
// StartConsuming starts consuming into a channel of size prefetchLimit
// must be called before consumers can be added!
// pollDuration is the duration the queue sleeps before checking for new deliveries
func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.Duration) error {
func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.Duration, errors chan<- error) error {
if queue.deliveryChan != nil {
return ErrorAlreadyConsuming
}
Expand All @@ -236,7 +236,7 @@ func (queue *redisQueue) StartConsuming(prefetchLimit int64, pollDuration time.D
queue.deliveryChan = make(chan Delivery, prefetchLimit)
atomic.StoreInt32(&queue.consumingStopped, 0)
// log.Printf("rmq queue started consuming %s %d %s", queue, prefetchLimit, pollDuration)
go queue.consume()
go queue.consume(errors)
return nil
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (queue *redisQueue) addConsumer(tag string) (name string, err error) {

var errorConsumingStopped = fmt.Errorf("consuming stopped") // TODO: move

func (queue *redisQueue) consume() {
func (queue *redisQueue) consume(errors chan<- error) {
errorCount := 0 // number of consecutive batch errors

for {
Expand All @@ -326,10 +326,10 @@ func (queue *redisQueue) consume() {

default: // redis error
errorCount++
// select { // try to add error to channel, but don't block
// case queue.errChan <- &ConsumeError{RedisErr: err, Count: errorCount}:
// default:
// }
select { // try to add error to channel, but don't block
case errors <- &ConsumeError{RedisErr: err}:
default:
}
time.Sleep(queue.pollDuration) // sleep before retry
}
}
Expand Down
30 changes: 15 additions & 15 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (suite *QueueSuite) TestConnectionQueues(c *C) {
queues, err = connection.getConsumingQueues()
c.Check(err, IsNil)
c.Check(queues, HasLen, 0)
c.Check(queue1.StartConsuming(1, time.Millisecond), IsNil)
c.Check(queue1.StartConsuming(1, time.Millisecond, nil), IsNil)
queues, err = connection.getConsumingQueues()
c.Check(err, IsNil)
c.Check(queues, DeepEquals, []string{"conn-q-q1"})
Expand All @@ -98,7 +98,7 @@ func (suite *QueueSuite) TestConnectionQueues(c *C) {
queues, err = connection.getConsumingQueues()
c.Check(err, IsNil)
c.Check(queues, HasLen, 1)
c.Check(queue2.StartConsuming(1, time.Millisecond), IsNil)
c.Check(queue2.StartConsuming(1, time.Millisecond, nil), IsNil)
queues, err = connection.getConsumingQueues()
c.Check(err, IsNil)
c.Check(queues, HasLen, 2)
Expand Down Expand Up @@ -167,8 +167,8 @@ func (suite *QueueSuite) TestQueue(c *C) {
queues, err := connection.getConsumingQueues()
c.Check(err, IsNil)
c.Check(queues, HasLen, 0)
c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond), Equals, ErrorAlreadyConsuming)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), Equals, ErrorAlreadyConsuming)
cons1name, err := queue.AddConsumer("queue-cons1", NewTestConsumer("queue-A"))
c.Check(err, IsNil)
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (suite *QueueSuite) TestConsumer(c *C) {

consumer := NewTestConsumer("cons-A")
consumer.AutoAck = false
c.Check(queue1.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue1.StartConsuming(10, time.Millisecond, nil), IsNil)
_, err = queue1.AddConsumer("cons-cons", consumer)
c.Check(err, IsNil)
c.Check(consumer.LastDelivery, IsNil)
Expand Down Expand Up @@ -302,7 +302,7 @@ func (suite *QueueSuite) TestConsumer(c *C) {

queue2, err := connection.OpenQueue("cons-func-q")
c.Check(err, IsNil)
c.Check(queue2.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue2.StartConsuming(10, time.Millisecond, nil), IsNil)

payloadChan := make(chan string, 1)
payload := "cons-func-payload"
Expand Down Expand Up @@ -348,7 +348,7 @@ func (suite *QueueSuite) TestMulti(c *C) {
c.Check(err, IsNil)
c.Check(count, Equals, int64(0))

c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
time.Sleep(2 * time.Millisecond)
count, err = queue.readyCount()
c.Check(err, IsNil)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (suite *QueueSuite) TestBatch(c *C) {
c.Check(err, IsNil)
}

c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
time.Sleep(10 * time.Millisecond)
count, err := queue.unackedCount()
c.Check(err, IsNil)
Expand Down Expand Up @@ -507,7 +507,7 @@ func (suite *QueueSuite) TestReturnRejected(c *C) {
c.Check(err, IsNil)
c.Check(count, Equals, int64(0))

c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
time.Sleep(time.Millisecond)
count, err = queue.readyCount()
c.Check(err, IsNil)
Expand Down Expand Up @@ -595,14 +595,14 @@ func (suite *QueueSuite) TestPushQueue(c *C) {
consumer1 := NewTestConsumer("push-cons")
consumer1.AutoAck = false
consumer1.AutoFinish = false
c.Check(queue1.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue1.StartConsuming(10, time.Millisecond, nil), IsNil)
_, err = queue1.AddConsumer("push-cons", consumer1)
c.Check(err, IsNil)

consumer2 := NewTestConsumer("push-cons")
consumer2.AutoAck = false
consumer2.AutoFinish = false
c.Check(queue2.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue2.StartConsuming(10, time.Millisecond, nil), IsNil)
_, err = queue2.AddConsumer("push-cons", consumer2)
c.Check(err, IsNil)

Expand Down Expand Up @@ -644,7 +644,7 @@ func (suite *QueueSuite) TestConsuming(c *C) {
c.FailNow() // should return closed finishedChan
}

c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
c.Check(queue.StopConsuming(), NotNil)
// already stopped
c.Check(queue.StopConsuming(), NotNil)
Expand All @@ -670,7 +670,7 @@ func (suite *QueueSuite) TestStopConsuming_Consumer(c *C) {
c.Check(err, IsNil)
}

c.Check(queue.StartConsuming(20, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(20, time.Millisecond, nil), IsNil)
var consumers []*TestConsumer
for i := 0; i < 10; i++ {
consumer := NewTestConsumer("c" + strconv.Itoa(i))
Expand Down Expand Up @@ -714,7 +714,7 @@ func (suite *QueueSuite) TestStopConsuming_BatchConsumer(c *C) {
c.Check(err, IsNil)
}

c.Check(queue.StartConsuming(20, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(20, time.Millisecond, nil), IsNil)

var consumers []*TestBatchConsumer
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -763,7 +763,7 @@ func (suite *QueueSuite) BenchmarkQueue(c *C) {
consumer := NewTestConsumer("bench-A")
// consumer.SleepDuration = time.Microsecond
consumers = append(consumers, consumer)
c.Check(queue.StartConsuming(10, time.Millisecond), IsNil)
c.Check(queue.StartConsuming(10, time.Millisecond, nil), IsNil)
_, err = queue.AddConsumer("bench-cons", consumer)
c.Check(err, IsNil)
}
Expand Down
2 changes: 1 addition & 1 deletion stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (suite *StatsSuite) TestStats(c *C) {
c.Check(err, IsNil)
consumer := NewTestConsumer("hand-A")
consumer.AutoAck = false
c.Check(q2.StartConsuming(10, time.Millisecond), IsNil)
c.Check(q2.StartConsuming(10, time.Millisecond, nil), IsNil)
_, err = q2.AddConsumer("stats-cons1", consumer)
c.Check(err, IsNil)
c.Check(q2.Publish("stats-d2"), IsNil)
Expand Down
34 changes: 17 additions & 17 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,29 @@ func (queue *TestQueue) PublishBytes(payload ...[]byte) error {
return queue.Publish(stringifiedBytes...)
}

func (queue *TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) }
func (queue *TestQueue) StartConsuming(int64, time.Duration) error { panic(errorNotSupported) }
func (queue *TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }
func (queue *TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) }
func (queue *TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) {
func (*TestQueue) SetPushQueue(Queue) { panic(errorNotSupported) }
func (*TestQueue) StartConsuming(int64, time.Duration, chan<- error) error {
panic(errorNotSupported)
}
func (queue *TestQueue) AddBatchConsumer(string, int64, BatchConsumer) (string, error) {
func (*TestQueue) StopConsuming() <-chan struct{} { panic(errorNotSupported) }
func (*TestQueue) AddConsumer(string, Consumer) (string, error) { panic(errorNotSupported) }
func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) { panic(errorNotSupported) }
func (*TestQueue) AddBatchConsumer(string, int64, BatchConsumer) (string, error) {
panic(errorNotSupported)
}
func (queue *TestQueue) AddBatchConsumerWithTimeout(string, int64, time.Duration, BatchConsumer) (string, error) {
func (*TestQueue) AddBatchConsumerWithTimeout(string, int64, time.Duration, BatchConsumer) (string, error) {
panic(errorNotSupported)
}
func (queue *TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) PurgeRejected() (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) Destroy() (int64, int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) closeInStaleConnection() error { panic(errorNotSupported) }
func (queue *TestQueue) readyCount() (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) unackedCount() (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) rejectedCount() (int64, error) { panic(errorNotSupported) }
func (queue *TestQueue) getConsumers() ([]string, error) { panic(errorNotSupported) }
func (*TestQueue) ReturnUnacked(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeRejected() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) Destroy() (int64, int64, error) { panic(errorNotSupported) }
func (*TestQueue) closeInStaleConnection() error { panic(errorNotSupported) }
func (*TestQueue) readyCount() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) unackedCount() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) rejectedCount() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) getConsumers() ([]string, error) { panic(errorNotSupported) }

// test helper

Expand Down

0 comments on commit 0c4be35

Please sign in to comment.