Skip to content

Commit

Permalink
Inject error chan into OpenConnection functions
Browse files Browse the repository at this point in the history
  • Loading branch information
wellle committed May 23, 2020
1 parent 0c4be35 commit 0218bd4
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 63 deletions.
10 changes: 5 additions & 5 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ func TestCleanerSuite(t *testing.T) {
type CleanerSuite struct{}

func (suite *CleanerSuite) TestCleaner(c *C) {
flushConn, err := OpenConnection("cleaner-flush", "tcp", "localhost:6379", 1)
flushConn, err := OpenConnection("cleaner-flush", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Check(flushConn.stopHeartbeat(), IsNil)
c.Check(flushConn.flushDb(), IsNil)

conn, err := OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1)
conn, err := OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queues, err := conn.GetOpenQueues()
c.Check(err, IsNil)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
c.Check(conn.stopHeartbeat(), IsNil)
time.Sleep(time.Millisecond)

conn, err = OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1)
conn, err = OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err = conn.OpenQueue("q1")
c.Check(err, IsNil)
Expand Down Expand Up @@ -192,7 +192,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
c.Check(conn.stopHeartbeat(), IsNil)
time.Sleep(time.Millisecond)

cleanerConn, err := OpenConnection("cleaner-conn", "tcp", "localhost:6379", 1)
cleanerConn, err := OpenConnection("cleaner-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
cleaner := NewCleaner(cleanerConn)
returned, err := cleaner.Clean()
Expand All @@ -205,7 +205,7 @@ func (suite *CleanerSuite) TestCleaner(c *C) {
c.Check(err, IsNil)
c.Check(queues, HasLen, 2)

conn, err = OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1)
conn, err = OpenConnection("cleaner-conn1", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err = conn.OpenQueue("q1")
c.Check(err, IsNil)
Expand Down
58 changes: 31 additions & 27 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
// entitify being connection/queue/delivery
var ErrorNotFound = errors.New("entity not found")

const heartbeatDuration = time.Minute
const (
heartbeatDuration = time.Minute // TTL of heartbeat key
heartbeatInterval = time.Second // how often we update the heartbeat key
)

// Connection is an interface that can be used to test publishing
type Connection interface {
Expand Down Expand Up @@ -46,18 +49,28 @@ type redisConnection struct {
heartbeatStopped bool
}

// OpenConnection opens and returns a new connection
func OpenConnection(tag, network, address string, db int, errors chan<- error) (Connection, error) {
redisClient := redis.NewClient(&redis.Options{
Network: network,
Addr: address,
DB: db,
})
return OpenConnectionWithRedisClient(tag, redisClient, errors)
}

// OpenConnectionWithRedisClient opens and returns a new connection
func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client) (*redisConnection, error) {
return openConnectionWithRedisClient(tag, RedisWrapper{redisClient})
func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client, errors chan<- error) (*redisConnection, error) {
return openConnectionWithRedisClient(tag, RedisWrapper{redisClient}, errors)
}

// OpenConnectionWithTestRedisClient opens and returns a new connection which
// uses a test redis client internally. This is useful in integration tests.
func OpenConnectionWithTestRedisClient(tag string) (*redisConnection, error) {
return openConnectionWithRedisClient(tag, NewTestRedisClient())
func OpenConnectionWithTestRedisClient(tag string, errors chan<- error) (*redisConnection, error) {
return openConnectionWithRedisClient(tag, NewTestRedisClient(), errors)
}

func openConnectionWithRedisClient(tag string, redisClient RedisClient) (*redisConnection, error) {
func openConnectionWithRedisClient(tag string, redisClient RedisClient, errors chan<- error) (*redisConnection, error) {
name := fmt.Sprintf("%s-%s", tag, uniuri.NewLen(6))

connection := &redisConnection{
Expand All @@ -76,21 +89,11 @@ func openConnectionWithRedisClient(tag string, redisClient RedisClient) (*redisC
return nil, err
}

go connection.heartbeat()
go connection.heartbeat(errors)
// log.Printf("rmq connection connected to %s %s:%s %d", name, network, address, db)
return connection, nil
}

// OpenConnection opens and returns a new connection
func OpenConnection(tag, network, address string, db int) (Connection, error) {
redisClient := redis.NewClient(&redis.Options{
Network: network,
Addr: address,
DB: db,
})
return OpenConnectionWithRedisClient(tag, redisClient)
}

// OpenQueue opens and returns the queue with a given name
func (connection *redisConnection) OpenQueue(name string) (Queue, error) {
if _, err := connection.redisClient.SAdd(queuesKey, name); err != nil {
Expand Down Expand Up @@ -158,19 +161,20 @@ func (connection *redisConnection) getConsumingQueues() ([]string, error) {
}

// heartbeat keeps the heartbeat key alive
func (connection *redisConnection) heartbeat() {
for {
if err := connection.updateHeartbeat(); err != nil {
// TODO!: what to do here???
// one idea was to wait a bit and retry, but make sure the key wasn't expired in between
// if it was, do panic
func (connection *redisConnection) heartbeat(errors chan<- error) {
for range time.NewTicker(heartbeatInterval).C {
if connection.heartbeatStopped {
return
}

time.Sleep(time.Second)
if err := connection.updateHeartbeat(); err != nil {
select { // try to add error to channel, but don't block
// TODO!: add error count or similar?
case errors <- &HeartbeatError{RedisErr: err}:
default:
}

if connection.heartbeatStopped {
// log.Printf("rmq connection stopped heartbeat %s", connection)
return
// TODO!: stop all consuming at some point (after 40s?)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@ type ConsumeError struct {
func (e *ConsumeError) Error() string {
return "rmq.ConsumeError: " + e.RedisErr.Error()
}

type HeartbeatError struct {
RedisErr error
}

func (e *HeartbeatError) Error() string {
return "rmq.HeartbeatError: " + e.RedisErr.Error()
}
2 changes: 1 addition & 1 deletion example/batch_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const unackedLimit = 1000

func main() {
connection, err := rmq.OpenConnection("consumer", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("consumer", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example/cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func main() {
connection, err := rmq.OpenConnection("cleaner", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("cleaner", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
22 changes: 15 additions & 7 deletions example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,21 @@ const (
)

func main() {
connection, err := rmq.OpenConnection("consumer", "tcp", "localhost:6379", 2)
errors := make(chan error, 10)
go func() {
for err := range errors {
switch err := err.(type) {
case *rmq.ConsumeError:
log.Print("consume error: ", err)
case *rmq.HeartbeatError:
log.Print("heartbeat error: ", err)
default:
log.Print("other error: ", err)
}
}
}()

connection, err := rmq.OpenConnection("consumer", "tcp", "localhost:6379", 2, errors)
if err != nil {
panic(err)
}
Expand All @@ -25,12 +39,6 @@ func main() {
panic(err)
}

errors := make(chan error, 10)
go func() {
for err := range errors {
log.Print("error: ", err)
}
}()
if err := queue.StartConsuming(unackedLimit, 500*time.Millisecond, errors); err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func main() {
connection, err := rmq.OpenConnection("handler", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("handler", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
)

func main() {
connection, err := rmq.OpenConnection("producer", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("producer", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example/purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func main() {
connection, err := rmq.OpenConnection("cleaner", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("cleaner", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example/returner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func main() {
connection, err := rmq.OpenConnection("returner", "tcp", "localhost:6379", 2)
connection, err := rmq.OpenConnection("returner", "tcp", "localhost:6379", 2, nil)
if err != nil {
panic(err)
}
Expand Down
30 changes: 15 additions & 15 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func TestQueueSuite(t *testing.T) {
type QueueSuite struct{}

func (suite *QueueSuite) TestConnections(c *C) {
flushConn, err := OpenConnection("conns-flush", "tcp", "localhost:6379", 1)
flushConn, err := OpenConnection("conns-flush", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Check(flushConn.stopHeartbeat(), IsNil)
c.Check(flushConn.flushDb(), IsNil)

connection, err := OpenConnection("conns-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("conns-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Assert(connection, NotNil)
_, err = NewCleaner(connection).Clean()
Expand All @@ -32,14 +32,14 @@ func (suite *QueueSuite) TestConnections(c *C) {
c.Check(err, IsNil)
c.Check(connections, HasLen, 1) // cleaner connection remains

conn1, err := OpenConnection("conns-conn1", "tcp", "localhost:6379", 1)
conn1, err := OpenConnection("conns-conn1", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
connections, err = connection.getConnections()
c.Check(err, IsNil)
c.Check(connections, HasLen, 2)
c.Check(connection.hijackConnection("nope").check(), Equals, ErrorNotFound)
c.Check(conn1.check(), IsNil)
conn2, err := OpenConnection("conns-conn2", "tcp", "localhost:6379", 1)
conn2, err := OpenConnection("conns-conn2", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
connections, err = connection.getConnections()
c.Check(err, IsNil)
Expand All @@ -66,7 +66,7 @@ func (suite *QueueSuite) TestConnections(c *C) {
}

func (suite *QueueSuite) TestConnectionQueues(c *C) {
connection, err := OpenConnection("conn-q-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("conn-q-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Assert(connection, NotNil)

Expand Down Expand Up @@ -136,7 +136,7 @@ func (suite *QueueSuite) TestConnectionQueues(c *C) {
}

func (suite *QueueSuite) TestQueue(c *C) {
connection, err := OpenConnection("queue-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("queue-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Assert(connection, NotNil)

Expand Down Expand Up @@ -189,7 +189,7 @@ func (suite *QueueSuite) TestQueue(c *C) {
}

func (suite *QueueSuite) TestConsumer(c *C) {
connection, err := OpenConnection("cons-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("cons-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
c.Assert(connection, NotNil)

Expand Down Expand Up @@ -330,7 +330,7 @@ func (suite *QueueSuite) TestConsumer(c *C) {
}

func (suite *QueueSuite) TestMulti(c *C) {
connection, err := OpenConnection("multi-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("multi-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("multi-q")
c.Check(err, IsNil)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (suite *QueueSuite) TestMulti(c *C) {
}

func (suite *QueueSuite) TestBatch(c *C) {
connection, err := OpenConnection("batch-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("batch-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("batch-q")
c.Check(err, IsNil)
Expand Down Expand Up @@ -485,7 +485,7 @@ func (suite *QueueSuite) TestBatch(c *C) {
}

func (suite *QueueSuite) TestReturnRejected(c *C) {
connection, err := OpenConnection("return-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("return-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("return-q")
c.Check(err, IsNil)
Expand Down Expand Up @@ -583,7 +583,7 @@ func (suite *QueueSuite) TestReturnRejected(c *C) {
}

func (suite *QueueSuite) TestPushQueue(c *C) {
connection, err := OpenConnection("push", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("push", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue1, err := connection.OpenQueue("queue1")
c.Check(err, IsNil)
Expand Down Expand Up @@ -631,7 +631,7 @@ func (suite *QueueSuite) TestPushQueue(c *C) {
}

func (suite *QueueSuite) TestConsuming(c *C) {
connection, err := OpenConnection("consume", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("consume", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("consume-q")
c.Check(err, IsNil)
Expand All @@ -656,7 +656,7 @@ func (suite *QueueSuite) TestConsuming(c *C) {
}

func (suite *QueueSuite) TestStopConsuming_Consumer(c *C) {
connection, err := OpenConnection("consume", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("consume", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("consume-q")
c.Check(err, IsNil)
Expand Down Expand Up @@ -700,7 +700,7 @@ func (suite *QueueSuite) TestStopConsuming_Consumer(c *C) {
}

func (suite *QueueSuite) TestStopConsuming_BatchConsumer(c *C) {
connection, err := OpenConnection("batchConsume", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("batchConsume", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queue, err := connection.OpenQueue("batchConsume-q")
c.Check(err, IsNil)
Expand Down Expand Up @@ -750,7 +750,7 @@ func (suite *QueueSuite) TestStopConsuming_BatchConsumer(c *C) {

func (suite *QueueSuite) BenchmarkQueue(c *C) {
// open queue
connection, err := OpenConnection("bench-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("bench-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
queueName := fmt.Sprintf("bench-q%d", c.N)
queue, err := connection.OpenQueue(queueName)
Expand Down
6 changes: 3 additions & 3 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func TestStatsSuite(t *testing.T) {
type StatsSuite struct{}

func (suite *StatsSuite) TestStats(c *C) {
connection, err := OpenConnection("stats-conn", "tcp", "localhost:6379", 1)
connection, err := OpenConnection("stats-conn", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
_, err = NewCleaner(connection).Clean()
c.Assert(err, IsNil)

conn1, err := OpenConnection("stats-conn1", "tcp", "localhost:6379", 1)
conn1, err := OpenConnection("stats-conn1", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
conn2, err := OpenConnection("stats-conn2", "tcp", "localhost:6379", 1)
conn2, err := OpenConnection("stats-conn2", "tcp", "localhost:6379", 1, nil)
c.Check(err, IsNil)
q1, err := conn2.OpenQueue("stats-q1")
c.Check(err, IsNil)
Expand Down

0 comments on commit 0218bd4

Please sign in to comment.