Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal/rabbitmq] move connection and retry logic into separate pkg #34361

Merged
merged 12 commits into from
Aug 14, 2024
Merged
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ internal/kubelet/ @open-teleme
internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
internal/otelarrow/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3
internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski
internal/rabbitmq/ @open-telemetry/collector-contrib-approvers @swar8080 @atoulme
internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax
internal/sqlquery/ @open-telemetry/collector-contrib-approvers @crobert-1 @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig => ../../internal/k8sconfig
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter => ../../exporter/prometheusexporter
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.106.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.106.1 // indirect
Expand Down Expand Up @@ -1073,6 +1074,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8ste

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter
Expand Down
3 changes: 2 additions & 1 deletion exporter/rabbitmqexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

const (
Expand Down Expand Up @@ -133,7 +134,7 @@ func getRoutingKeyOrDefault(config *Config, fallback string) string {

func newPublisherFactory(set exporter.Settings) publisherFactory {
return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) {
return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig)
return publisher.NewConnection(set.Logger, rabbitmq.NewAmqpClient(set.Logger), dialConfig)
}
}

Expand Down
3 changes: 3 additions & 0 deletions exporter/rabbitmqexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.106.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.106.1
github.com/rabbitmq/amqp091-go v1.10.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
Expand Down Expand Up @@ -110,6 +111,8 @@ require (

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
Expand Down
110 changes: 0 additions & 110 deletions exporter/rabbitmqexporter/internal/publisher/client.go

This file was deleted.

114 changes: 22 additions & 92 deletions exporter/rabbitmqexporter/internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,20 @@ package publisher // import "github.com/open-telemetry/opentelemetry-collector-c

import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"

otelrabbitmq "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

type DialConfig struct {
URL string
otelrabbitmq.DialConfig
Durable bool
Vhost string
Auth amqp.Authentication
ConnectionTimeout time.Duration
Heartbeat time.Duration
PublishConfirmationTimeout time.Duration
TLS *tls.Config
ConnectionName string
}

type Message struct {
Expand All @@ -33,20 +27,20 @@ type Message struct {
Body []byte
}

func NewConnection(logger *zap.Logger, client AmqpClient, config DialConfig) (Publisher, error) {
func NewConnection(logger *zap.Logger, client otelrabbitmq.AmqpClient, config DialConfig) (Publisher, error) {
p := publisher{
logger: logger,
client: client,
config: config,
connLock: &sync.Mutex{},
connectionErrors: make(chan *amqp.Error, 1),
logger: logger,
client: client,
config: config,
}

p.connLock.Lock()
defer p.connLock.Unlock()
err := p.connect()
conn, err := p.client.DialConfig(p.config.DialConfig)
if err != nil {
return &p, err
}
p.connection = conn

return &p, err
return &p, nil
}

type Publisher interface {
Expand All @@ -55,16 +49,14 @@ type Publisher interface {
}

type publisher struct {
logger *zap.Logger
client AmqpClient
config DialConfig
connLock *sync.Mutex
connection Connection
connectionErrors chan *amqp.Error
logger *zap.Logger
client otelrabbitmq.AmqpClient
config DialConfig
connection otelrabbitmq.Connection
}

func (p *publisher) Publish(ctx context.Context, message Message) error {
err := p.reconnectIfUnhealthy()
err := p.connection.ReconnectIfUnhealthy()
if err != nil {
return err
}
Expand All @@ -73,7 +65,7 @@ func (p *publisher) Publish(ctx context.Context, message Message) error {
// This could later be optimized to re-use channels which avoids repeated network calls to create and close them.
// Concurrency-control through something like a resource pool would be necessary since aqmp channels are not thread safe.
channel, err := p.connection.Channel()
defer func(channel Channel) {
defer func(channel otelrabbitmq.Channel) {
if channel != nil {
err2 := channel.Close()
if err2 != nil {
Expand Down Expand Up @@ -125,71 +117,9 @@ func (p *publisher) Publish(ctx context.Context, message Message) error {
}
}

func (p *publisher) reconnectIfUnhealthy() error {
p.connLock.Lock()
defer p.connLock.Unlock()

hasConnectionError := false
select {
case err := <-p.connectionErrors:
hasConnectionError = true
p.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err))
default:
break
}

if hasConnectionError || !p.isConnected() {
if p.isConnected() {
err := p.connection.Close()
if err != nil {
p.logger.Warn("Error closing unhealthy connection", zap.Error(err))
}
}

if err := p.connect(); err != nil {
return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err)
}
p.logger.Info("Successfully restored unhealthy rabbitmq connection")
}

return nil
}

func (p *publisher) connect() error {
p.logger.Debug("Connecting to rabbitmq")

properties := amqp.Table{}
properties.SetClientConnectionName(p.config.ConnectionName)

connection, err := p.client.DialConfig(p.config.URL, amqp.Config{
SASL: []amqp.Authentication{p.config.Auth},
Vhost: p.config.Vhost,
Heartbeat: p.config.Heartbeat,
Dial: amqp.DefaultDial(p.config.ConnectionTimeout),
Properties: properties,
TLSClientConfig: p.config.TLS,
})
if connection != nil {
p.connection = connection
}
if err != nil {
return err
}

// Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors.
// Also re-create this channel each time because apparently the amqp library can close it
p.connectionErrors = make(chan *amqp.Error, 1)
p.connection.NotifyClose(p.connectionErrors)
return nil
}

func (p *publisher) Close() error {
if p.isConnected() {
return p.connection.Close()
if p.connection == nil {
return nil
}
return nil
}

func (p *publisher) isConnected() bool {
return p.connection != nil && !p.connection.IsClosed()
return p.connection.Close()
}
Loading