Skip to content

Commit

Permalink
Merge pull request #18 from grisu48/rabbitmq
Browse files Browse the repository at this point in the history
Add reconnect handlers to rabbitmq
  • Loading branch information
grisu48 authored Mar 9, 2023
2 parents 92ff997 + d6f92d5 commit 8ca7dfa
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,80 @@ type CommentMQ struct {
type RabbitMQ struct {
remote string
con *amqp.Connection
closed bool
}

// Callback when the connection was closed
type RabbitMQCloseCallback func(error)

// Close connection
func (mq *RabbitMQ) Close() {
mq.closed = true
mq.con.Close()
}

// Connected returns true if RabbitMQ is connected
func (mq *RabbitMQ) Connected() bool {
return !mq.closed && !mq.con.IsClosed()
}

// Connected returns true if RabbitMQ is closing or if it is closed.
func (mq *RabbitMQ) Closed() bool {
if mq.closed {
return true
}
if mq.con.IsClosed() {
mq.closed = true
return true
}
return false
}

// Reconnect to the RabbitMQ server. This will close any previous connections and channels
func (mq *RabbitMQ) Reconnect() error {
var err error
mq.con.Close()
mq.closed = false
mq.con, err = amqp.Dial(mq.remote)
return err
}

// NotifyClose registeres a defined callback function for when the RabbitMQ connection is closed
func (mq *RabbitMQ) NotifyClose(callback RabbitMQCloseCallback) {
go func() {
recvChannel := make(chan *amqp.Error, 1)
mq.con.NotifyClose(recvChannel)
for err := range recvChannel {
callback(fmt.Errorf(err.Error()))
}
}()
}

// RabbitMQSubscription handles a single subscription
type RabbitMQSubscription struct {
channel *amqp.Channel
key string
obs <-chan amqp.Delivery
mq *RabbitMQ
con *amqp.Connection // Keep a reference to the connection to check if it is still connected. This is necessary because mq can reconnect and therefore have another new mq.con instance
}

// Connected returns true if RabbitMQ is connected
func (sub *RabbitMQSubscription) Connected() bool {
return !sub.con.IsClosed()
}

// Receive receives a raw non-empty RabbitMQ messages
func (sub *RabbitMQSubscription) Receive() (amqp.Delivery, error) {
for {
msg := <-sub.obs
for msg, ok := <-sub.obs; ok; {
if len(msg.Body) > 0 {
return msg, nil
}
}
if sub.mq.closed || sub.con.IsClosed() {
return amqp.Delivery{}, fmt.Errorf("EOF")
}
return amqp.Delivery{}, fmt.Errorf("channel unexpectedly closed")
}

// ReceiveJob receives the next message and try to parse it as job
Expand Down Expand Up @@ -174,7 +225,7 @@ func (sub *RabbitMQSubscription) Close() {
// ConnectRabbitMQ connects to a RabbitMQ instance and returns the RabbitMQ object
func ConnectRabbitMQ(remote string) (RabbitMQ, error) {
var err error
rmq := RabbitMQ{remote: remote}
rmq := RabbitMQ{remote: remote, closed: false}

rmq.con, err = amqp.Dial(remote)
if err != nil {
Expand Down Expand Up @@ -213,5 +264,6 @@ func (mq *RabbitMQ) Subscribe(key string) (RabbitMQSubscription, error) {
sub.channel = ch
sub.key = key
sub.obs = obs
sub.con = mq.con
return sub, nil
}

0 comments on commit 8ca7dfa

Please sign in to comment.