Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal/rabbitmq] move connection and retry logic into separate pkg #34361

Merged
merged 12 commits into from
Aug 14, 2024
Prev Previous commit
Next Next commit
internal/rabbitmq: move retry logic into connection
Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>
  • Loading branch information
frzifus committed Aug 1, 2024
commit 883cb6b8b3bbbab3aef99982d67e623965c3fec4
2 changes: 1 addition & 1 deletion exporter/rabbitmqexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func getRoutingKeyOrDefault(config *Config, fallback string) string {

func newPublisherFactory(set exporter.Settings) publisherFactory {
return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) {
return publisher.NewConnection(set.Logger, rabbitmq.NewAmqpClient(), dialConfig)
return publisher.NewConnection(set.Logger, rabbitmq.NewAmqpClient(set.Logger), dialConfig)
}
}

Expand Down
112 changes: 18 additions & 94 deletions exporter/rabbitmqexporter/internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,20 @@ package publisher // import "github.com/open-telemetry/opentelemetry-collector-c

import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's worth aliasing this import so its clearer when skimming the code that rabbitmq refers to an otel-contrib package and not the external rabbitmq client library

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otelrabbitmq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it 👍

)

type DialConfig struct {
URL string
rabbitmq.DialConfig
Durable bool
Vhost string
Auth amqp.Authentication
ConnectionTimeout time.Duration
Heartbeat time.Duration
PublishConfirmationTimeout time.Duration
TLS *tls.Config
ConnectionName string
}

type Message struct {
Expand All @@ -37,18 +29,17 @@ type Message struct {

func NewConnection(logger *zap.Logger, client rabbitmq.AmqpClient, config DialConfig) (Publisher, error) {
p := publisher{
logger: logger,
client: client,
config: config,
connLock: &sync.Mutex{},
connectionErrors: make(chan *amqp.Error, 1),
logger: logger,
client: client,
config: config,
}

p.connLock.Lock()
defer p.connLock.Unlock()
err := p.connect()

return &p, err
conn, err := p.client.DialConfig(p.config.DialConfig)
if err != nil {
return nil, err
}
p.connection = conn
return &p, nil
}

type Publisher interface {
Expand All @@ -57,16 +48,14 @@ type Publisher interface {
}

type publisher struct {
logger *zap.Logger
client rabbitmq.AmqpClient
config DialConfig
connLock *sync.Mutex
connection rabbitmq.Connection
connectionErrors chan *amqp.Error
logger *zap.Logger
client rabbitmq.AmqpClient
config DialConfig
connection rabbitmq.Connection
}

func (p *publisher) Publish(ctx context.Context, message Message) error {
err := p.reconnectIfUnhealthy()
err := p.connection.ReconnectIfUnhealthy()
if err != nil {
return err
}
Expand Down Expand Up @@ -127,71 +116,6 @@ func (p *publisher) Publish(ctx context.Context, message Message) error {
}
}

func (p *publisher) reconnectIfUnhealthy() error {
p.connLock.Lock()
defer p.connLock.Unlock()

hasConnectionError := false
select {
case err := <-p.connectionErrors:
hasConnectionError = true
p.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err))
default:
break
}

if hasConnectionError || !p.isConnected() {
if p.isConnected() {
err := p.connection.Close()
if err != nil {
p.logger.Warn("Error closing unhealthy connection", zap.Error(err))
}
}

if err := p.connect(); err != nil {
return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err)
}
p.logger.Info("Successfully restored unhealthy rabbitmq connection")
}

return nil
}

func (p *publisher) connect() error {
p.logger.Debug("Connecting to rabbitmq")

properties := amqp.Table{}
properties.SetClientConnectionName(p.config.ConnectionName)

connection, err := p.client.DialConfig(p.config.URL, amqp.Config{
SASL: []amqp.Authentication{p.config.Auth},
Vhost: p.config.Vhost,
Heartbeat: p.config.Heartbeat,
Dial: amqp.DefaultDial(p.config.ConnectionTimeout),
Properties: properties,
TLSClientConfig: p.config.TLS,
})
if connection != nil {
p.connection = connection
}
if err != nil {
return err
}

// Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors.
// Also re-create this channel each time because apparently the amqp library can close it
p.connectionErrors = make(chan *amqp.Error, 1)
p.connection.NotifyClose(p.connectionErrors)
return nil
}

func (p *publisher) Close() error {
if p.isConnected() {
return p.connection.Close()
}
return nil
}

func (p *publisher) isConnected() bool {
return p.connection != nil && !p.connection.IsClosed()
return p.connection.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

const (
Expand Down
21 changes: 12 additions & 9 deletions exporter/rabbitmqexporter/rabbitmq_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

type rabbitmqExporter struct {
Expand Down Expand Up @@ -50,17 +51,19 @@ func (e *rabbitmqExporter) start(ctx context.Context, host component.Host) error
e.marshaler = m

dialConfig := publisher.DialConfig{
URL: e.config.Connection.Endpoint,
Vhost: e.config.Connection.VHost,
Auth: &amqp.PlainAuth{
Username: e.config.Connection.Auth.Plain.Username,
Password: e.config.Connection.Auth.Plain.Password,
},
Durable: e.config.Durable,
ConnectionName: e.connectionName,
ConnectionTimeout: e.config.Connection.ConnectionTimeout,
Heartbeat: e.config.Connection.Heartbeat,
PublishConfirmationTimeout: e.config.Connection.PublishConfirmationTimeout,
DialConfig: rabbitmq.DialConfig{
URL: e.config.Connection.Endpoint,
Vhost: e.config.Connection.VHost,
Auth: &amqp.PlainAuth{
Username: e.config.Connection.Auth.Plain.Username,
Password: e.config.Connection.Auth.Plain.Password,
},
ConnectionName: e.connectionName,
ConnectionTimeout: e.config.Connection.ConnectionTimeout,
Heartbeat: e.config.Connection.Heartbeat,
},
}

tlsConfig, err := e.tlsFactory(ctx)
Expand Down
124 changes: 106 additions & 18 deletions internal/rabbitmq/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ package rabbitmq // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"crypto/tls"
"errors"
"sync"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)

type AmqpClient interface {
DialConfig(url string, config amqp.Config) (Connection, error)
DialConfig(config DialConfig) (Connection, error)
}

type Connection interface {
ReconnectIfUnhealthy() error
IsClosed() bool
Channel() (Channel, error)
NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
Expand All @@ -32,14 +38,13 @@ type DeferredConfirmation interface {
Acked() bool
}

func NewAmqpClient() AmqpClient {
return &client{}
}

type client struct{}

type connectionHolder struct {
connection *amqp.Connection
url string
config amqp.Config
connection *amqp.Connection
logger *zap.Logger
connLock *sync.Mutex
connectionErrors chan *amqp.Error
}

type channelHolder struct {
Expand All @@ -50,15 +55,102 @@ type deferredConfirmationHolder struct {
confirmation *amqp.DeferredConfirmation
}

func (*client) DialConfig(url string, config amqp.Config) (Connection, error) {
con, err := amqp.DialConfig(url, config)
type DialConfig struct {
URL string
Vhost string
Auth amqp.Authentication
ConnectionTimeout time.Duration
Heartbeat time.Duration
TLS *tls.Config
ConnectionName string
}

func NewAmqpClient(logger *zap.Logger) AmqpClient {
return &client{logger: logger}
}

type client struct {
logger *zap.Logger
}

func (c *client) DialConfig(config DialConfig) (Connection, error) {
properties := amqp.Table{}
properties.SetClientConnectionName(config.ConnectionName)
ch := &connectionHolder{
url: config.URL,
config: amqp.Config{
SASL: []amqp.Authentication{config.Auth},
Vhost: config.Vhost,
TLSClientConfig: config.TLS,
Heartbeat: config.Heartbeat,
Dial: amqp.DefaultDial(config.ConnectionTimeout),
Properties: properties,
},
logger: c.logger,
connLock: &sync.Mutex{},
connectionErrors: make(chan *amqp.Error),
}

return ch, ch.connect()
}

func (c *connectionHolder) ReconnectIfUnhealthy() error {
c.connLock.Lock()
defer c.connLock.Unlock()

hasConnectionError := false
select {
case err := <-c.connectionErrors:
hasConnectionError = true
c.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err))
default:
break
}

if hasConnectionError || !c.isConnected() {
if c.isConnected() {
err := c.connection.Close()
if err != nil {
c.logger.Warn("Error closing unhealthy connection", zap.Error(err))
}
}

if err := c.connect(); err != nil {
return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err)
}
c.logger.Info("Successfully restored unhealthy rabbitmq connection")
}

return nil
}

func (c *connectionHolder) connect() error {
c.logger.Debug("Connecting to rabbitmq")

connection, err := amqp.DialConfig(c.url, c.config)
if connection != nil {
c.connection = connection
}
if err != nil {
return nil, err
return err
}

return &connectionHolder{
connection: con,
}, nil
// Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors.
// Also re-create this channel each time because apparently the amqp library can close it
c.connectionErrors = make(chan *amqp.Error, 1)
c.connection.NotifyClose(c.connectionErrors)
return nil
}

func (c *connectionHolder) Close() error {
if c.isConnected() {
return c.connection.Close()
}
return nil
}

func (c *connectionHolder) isConnected() bool {
return c.connection != nil && !c.connection.IsClosed()
}

func (c *connectionHolder) Channel() (Channel, error) {
Expand All @@ -77,10 +169,6 @@ func (c *connectionHolder) NotifyClose(receiver chan *amqp.Error) chan *amqp.Err
return c.connection.NotifyClose(receiver)
}

func (c *connectionHolder) Close() error {
return c.connection.Close()
}

func (c *channelHolder) Confirm(noWait bool) error {
return c.channel.Confirm(noWait)
}
Expand Down
7 changes: 6 additions & 1 deletion internal/rabbitmq/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbit

go 1.21.0
codeboten marked this conversation as resolved.
Show resolved Hide resolved

require github.com/rabbitmq/amqp091-go v1.10.0
require (
github.com/rabbitmq/amqp091-go v1.10.0
go.uber.org/zap v1.27.0
)

require go.uber.org/multierr v1.10.0 // indirect
Loading