Skip to content

Commit

Permalink
Remove ok bool return values in many places
Browse files Browse the repository at this point in the history
Change int to int64 in many places
  • Loading branch information
wellle committed May 23, 2020
1 parent 7cb665d commit 17c5df6
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 432 deletions.
8 changes: 2 additions & 6 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func CleanConnection(connection *redisConnection) error {
}

for _, queueName := range queueNames {
// TODO: can we avoid this type assertion/check?
queue, ok := connection.OpenQueue(queueName).(*redisQueue)
// TODO: these would merge if we returned redis.nil
if err != nil {
Expand All @@ -58,15 +59,10 @@ func CleanConnection(connection *redisConnection) error {
CleanQueue(queue)
}

ok, err := connection.Close()
if err != nil {
if err := connection.Close(); err != nil {
return err
}

if !ok {
return fmt.Errorf("rmq cleaner failed to close connection %s", connection)
}

if err := connection.CloseAllQueuesInConnection(); err != nil {
return fmt.Errorf("rmq cleaner failed to close all queues %s %s", connection, err)
}
Expand Down
62 changes: 31 additions & 31 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,43 @@ func (suite *CleanerSuite) TestCleaner(c *C) {

count, err := queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 0)
c.Check(count, Equals, int64(0))
queue.Publish("del1")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 1)
c.Check(count, Equals, int64(1))
queue.Publish("del2")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
queue.Publish("del3")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 3)
c.Check(count, Equals, int64(3))
queue.Publish("del4")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 4)
c.Check(count, Equals, int64(4))
queue.Publish("del5")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 5)
c.Check(count, Equals, int64(5))
queue.Publish("del6")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 6)
c.Check(count, Equals, int64(6))

count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 0)
c.Check(count, Equals, int64(0))
queue.StartConsuming(2, time.Millisecond)
time.Sleep(time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 4)
c.Check(count, Equals, int64(4))

consumer := NewTestConsumer("c-A")
consumer.AutoFinish = false
Expand All @@ -82,10 +82,10 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 4)
c.Check(count, Equals, int64(4))

c.Assert(consumer.LastDelivery, NotNil)
c.Check(consumer.LastDelivery.Payload(), Equals, "del1")
Expand All @@ -95,19 +95,19 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 3)
c.Check(count, Equals, int64(3))

consumer.Finish()
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 3)
c.Check(count, Equals, int64(3))
c.Check(consumer.LastDelivery.Payload(), Equals, "del2")

queue.StopConsuming()
Expand All @@ -121,35 +121,35 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
queue.Publish("del7")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 4)
c.Check(count, Equals, int64(4))
queue.Publish("del8")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 5)
c.Check(count, Equals, int64(5))
queue.Publish("del9")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 6)
c.Check(count, Equals, int64(6))
queue.Publish("del10")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 7)
c.Check(count, Equals, int64(7))
queue.Publish("del11")
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 8)
c.Check(count, Equals, int64(8))

count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 0)
c.Check(count, Equals, int64(0))
queue.StartConsuming(2, time.Millisecond)
time.Sleep(time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 6)
c.Check(count, Equals, int64(6))

consumer = NewTestConsumer("c-B")
consumer.AutoFinish = false
Expand All @@ -159,20 +159,20 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 6)
c.Check(count, Equals, int64(6))
c.Check(consumer.LastDelivery.Payload(), Equals, "del4")

consumer.Finish() // unacked
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 6)
c.Check(count, Equals, int64(6))

c.Check(consumer.LastDelivery.Payload(), Equals, "del5")
ok, err = consumer.LastDelivery.Ack()
Expand All @@ -181,10 +181,10 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
time.Sleep(10 * time.Millisecond)
count, err = queue.UnackedCount()
c.Check(err, IsNil)
c.Check(count, Equals, 2)
c.Check(count, Equals, int64(2))
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 5)
c.Check(count, Equals, int64(5))

queue.StopConsuming()
conn.StopHeartbeat()
Expand All @@ -196,7 +196,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
c.Check(cleaner.Clean(), IsNil)
count, err = queue.ReadyCount()
c.Check(err, IsNil)
c.Check(count, Equals, 9) // 2 of 11 were acked above
c.Check(count, Equals, int64(9)) // 2 of 11 were acked above
queues, err = conn.GetOpenQueues()
c.Check(err, IsNil)
c.Check(queues, HasLen, 2)
Expand Down
34 changes: 13 additions & 21 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ func openConnectionWithRedisClient(tag string, redisClient RedisClient) (*redisC
redisClient: redisClient,
}

ok, err := connection.updateHeartbeat()
// TODO: do it like this in more places?
if !ok || err != nil { // checks the connection
if err := connection.updateHeartbeat(); err != nil { // checks the connection
return nil, err
}

// add to connection set after setting heartbeat to avoid race with cleaner
ok, err = redisClient.SAdd(connectionsKey, name)
if !ok {
if err := redisClient.SAdd(connectionsKey, name); err != nil {
return nil, err
}

Expand Down Expand Up @@ -99,21 +96,21 @@ func (connection *redisConnection) GetConnections() ([]string, error) {
// Check retuns true if the connection is currently active in terms of heartbeat
func (connection *redisConnection) Check() (bool, error) {
heartbeatKey := strings.Replace(connectionHeartbeatTemplate, phConnection, connection.Name, 1)
ttl, _, err := connection.redisClient.TTL(heartbeatKey)
ttl, err := connection.redisClient.TTL(heartbeatKey)
return ttl > 0, err
}

// StopHeartbeat stops the heartbeat of the connection
// it does not remove it from the list of connections so it can later be found by the cleaner
func (connection *redisConnection) StopHeartbeat() (bool, error) {
func (connection *redisConnection) StopHeartbeat() error {
connection.heartbeatStopped = true
_, ok, err := connection.redisClient.Del(connection.heartbeatKey)
return ok, err
_, err := connection.redisClient.Del(connection.heartbeatKey)
return err
}

func (connection *redisConnection) Close() (bool, error) {
_, ok, err := connection.redisClient.SRem(connectionsKey, connection.Name)
return ok, err
func (connection *redisConnection) Close() error {
_, err := connection.redisClient.SRem(connectionsKey, connection.Name)
return err
}

// GetOpenQueues returns a list of all open queues
Expand All @@ -122,8 +119,8 @@ func (connection *redisConnection) GetOpenQueues() ([]string, error) {
}

// CloseAllQueues closes all queues by removing them from the global list
func (connection *redisConnection) CloseAllQueues() (int, error) {
count, _, err := connection.redisClient.Del(queuesKey)
func (connection *redisConnection) CloseAllQueues() (int64, error) {
count, err := connection.redisClient.Del(queuesKey)
return count, err
}

Expand All @@ -142,17 +139,12 @@ func (connection *redisConnection) GetConsumingQueues() ([]string, error) {
// heartbeat keeps the heartbeat key alive
func (connection *redisConnection) heartbeat() {
for {
ok, err := connection.updateHeartbeat()
if err != nil {
if err := connection.updateHeartbeat(); err != nil {
// TODO: what to do here???
// one idea was to wait a bit and retry, but make sure the key wasn't expired in between
// if it was, do panic
}

if !ok {
// log.Printf("rmq connection failed to update heartbeat %s", connection)
}

time.Sleep(time.Second)

if connection.heartbeatStopped {
Expand All @@ -162,7 +154,7 @@ func (connection *redisConnection) heartbeat() {
}
}

func (connection *redisConnection) updateHeartbeat() (bool, error) {
func (connection *redisConnection) updateHeartbeat() error {
return connection.redisClient.Set(connection.heartbeatKey, "1", heartbeatDuration)
}

Expand Down
11 changes: 7 additions & 4 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ func (delivery *wrapDelivery) Payload() string {
func (delivery *wrapDelivery) Ack() (bool, error) {
// debug(fmt.Sprintf("delivery ack %s", delivery)) // COMMENTOUT

count, ok, err := delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload)
return ok && count == 1, err
// TODO: check for other places where we get a returned affectedCount but
// ignore it. consider checking the value
count, err := delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload)
// TODO: return error only, different error if count != 1?
return count == 1, err
}

func (delivery *wrapDelivery) Reject() (bool, error) {
Expand All @@ -57,11 +60,11 @@ func (delivery *wrapDelivery) Push() (bool, error) {
}

func (delivery *wrapDelivery) move(key string) (bool, error) {
if ok, err := delivery.redisClient.LPush(key, delivery.payload); !ok {
if err := delivery.redisClient.LPush(key, delivery.payload); err != nil {
return false, err
}

if _, ok, err := delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload); !ok {
if _, err := delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload); err != nil {
return false, err
}

Expand Down
2 changes: 2 additions & 0 deletions example/cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func main() {
for _ = range time.Tick(time.Second) {
if err := cleaner.Clean(); err != nil {
log.Printf("failed to clean: %s", err)
continue
}
log.Printf("cleaned")
}
}
9 changes: 8 additions & 1 deletion example/purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/adjust/rmq/v2"
"log"
)

func main() {
Expand All @@ -11,5 +12,11 @@ func main() {
}

queue := connection.OpenQueue("things")
queue.PurgeReady()
count, err := queue.PurgeReady()
if err != nil {
log.Printf("failed to purge: %s", err)
return
}

log.Printf("purged %d", count)
}
Loading

0 comments on commit 17c5df6

Please sign in to comment.