diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cd4673e48508..bcd553189499 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -137,6 +137,7 @@ internal/kubelet/ @open-teleme internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole internal/otelarrow/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3 internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski +internal/rabbitmq/ @open-telemetry/collector-contrib-approvers @swar8080 @atoulme internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax internal/sqlquery/ @open-telemetry/collector-contrib-approvers @crobert-1 @dmitryax diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index 64313e37a7b3..e133885c9f32 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -134,6 +134,7 @@ body: - internal/metadataproviders - internal/otelarrow - internal/pdatautil + - internal/rabbitmq - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index 3f7041dab759..ab984e806589 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -128,6 +128,7 @@ body: - internal/metadataproviders - internal/otelarrow - internal/pdatautil + - internal/rabbitmq - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index 453af8486fb5..c76b0ddadb8a 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -128,6 +128,7 @@ body: - internal/metadataproviders - internal/otelarrow - internal/pdatautil + - internal/rabbitmq - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index d89b2119d78e..890f4bf0e43a 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -133,6 +133,7 @@ body: - internal/metadataproviders - internal/otelarrow - internal/pdatautil + - internal/rabbitmq - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 5c0223ea06bb..f51b9b0c88c6 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -355,6 +355,7 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig => ../../internal/k8sconfig - github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest - github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter => ../../exporter/prometheusexporter diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index bc676df9de31..dadfef38f918 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -640,6 +640,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.107.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.107.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.107.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.107.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.107.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.107.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.107.0 // indirect @@ -1073,6 +1074,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8ste replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq + replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter diff --git a/exporter/rabbitmqexporter/factory.go b/exporter/rabbitmqexporter/factory.go index ed71c2a5d52a..2c08f98aaf3e 100644 --- a/exporter/rabbitmqexporter/factory.go +++ b/exporter/rabbitmqexporter/factory.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq" ) const ( @@ -133,7 +134,7 @@ func getRoutingKeyOrDefault(config *Config, fallback string) string { func newPublisherFactory(set exporter.Settings) publisherFactory { return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) { - return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig) + return publisher.NewConnection(set.Logger, rabbitmq.NewAmqpClient(set.Logger), dialConfig) } } diff --git a/exporter/rabbitmqexporter/go.mod b/exporter/rabbitmqexporter/go.mod index 7de3bd3ae2f3..c9b7bffbb250 100644 --- a/exporter/rabbitmqexporter/go.mod +++ b/exporter/rabbitmqexporter/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.107.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.107.0 github.com/rabbitmq/amqp091-go v1.10.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.31.0 @@ -110,6 +111,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest diff --git a/exporter/rabbitmqexporter/internal/publisher/client.go b/exporter/rabbitmqexporter/internal/publisher/client.go deleted file mode 100644 index 905d7da564a1..000000000000 --- a/exporter/rabbitmqexporter/internal/publisher/client.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package publisher // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" - -import ( - "context" - - amqp "github.com/rabbitmq/amqp091-go" -) - -type AmqpClient interface { - DialConfig(url string, config amqp.Config) (Connection, error) -} - -type Connection interface { - IsClosed() bool - Channel() (Channel, error) - NotifyClose(receiver chan *amqp.Error) chan *amqp.Error - Close() error -} - -type Channel interface { - Confirm(noWait bool) error - PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) - IsClosed() bool - Close() error -} - -type DeferredConfirmation interface { - Done() <-chan struct{} - Acked() bool -} - -func NewAmqpClient() AmqpClient { - return &client{} -} - -type client struct{} - -type connectionHolder struct { - connection *amqp.Connection -} - -type channelHolder struct { - channel *amqp.Channel -} - -type deferredConfirmationHolder struct { - confirmation *amqp.DeferredConfirmation -} - -func (*client) DialConfig(url string, config amqp.Config) (Connection, error) { - con, err := amqp.DialConfig(url, config) - if err != nil { - return nil, err - } - - return &connectionHolder{ - connection: con, - }, nil -} - -func (c *connectionHolder) Channel() (Channel, error) { - channel, err := c.connection.Channel() - if err != nil { - return nil, err - } - return &channelHolder{channel: channel}, nil -} - -func (c *connectionHolder) IsClosed() bool { - return c.connection.IsClosed() -} - -func (c *connectionHolder) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { - return c.connection.NotifyClose(receiver) -} - -func (c *connectionHolder) Close() error { - return c.connection.Close() -} - -func (c *channelHolder) Confirm(noWait bool) error { - return c.channel.Confirm(noWait) -} - -func (c *channelHolder) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { - confirmation, err := c.channel.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) - if err != nil { - return nil, err - } - return &deferredConfirmationHolder{confirmation: confirmation}, nil -} - -func (c *channelHolder) IsClosed() bool { - return c.channel.IsClosed() -} - -func (c *channelHolder) Close() error { - return c.channel.Close() -} - -func (d *deferredConfirmationHolder) Done() <-chan struct{} { - return d.confirmation.Done() -} - -func (d *deferredConfirmationHolder) Acked() bool { - return d.confirmation.Acked() -} diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher.go b/exporter/rabbitmqexporter/internal/publisher/publisher.go index c4a6b994ba74..8a4ec63c688c 100644 --- a/exporter/rabbitmqexporter/internal/publisher/publisher.go +++ b/exporter/rabbitmqexporter/internal/publisher/publisher.go @@ -5,26 +5,20 @@ package publisher // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" - "crypto/tls" "errors" "fmt" - "sync" "time" amqp "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" + + otelrabbitmq "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq" ) type DialConfig struct { - URL string + otelrabbitmq.DialConfig Durable bool - Vhost string - Auth amqp.Authentication - ConnectionTimeout time.Duration - Heartbeat time.Duration PublishConfirmationTimeout time.Duration - TLS *tls.Config - ConnectionName string } type Message struct { @@ -33,20 +27,20 @@ type Message struct { Body []byte } -func NewConnection(logger *zap.Logger, client AmqpClient, config DialConfig) (Publisher, error) { +func NewConnection(logger *zap.Logger, client otelrabbitmq.AmqpClient, config DialConfig) (Publisher, error) { p := publisher{ - logger: logger, - client: client, - config: config, - connLock: &sync.Mutex{}, - connectionErrors: make(chan *amqp.Error, 1), + logger: logger, + client: client, + config: config, } - p.connLock.Lock() - defer p.connLock.Unlock() - err := p.connect() + conn, err := p.client.DialConfig(p.config.DialConfig) + if err != nil { + return &p, err + } + p.connection = conn - return &p, err + return &p, nil } type Publisher interface { @@ -55,16 +49,14 @@ type Publisher interface { } type publisher struct { - logger *zap.Logger - client AmqpClient - config DialConfig - connLock *sync.Mutex - connection Connection - connectionErrors chan *amqp.Error + logger *zap.Logger + client otelrabbitmq.AmqpClient + config DialConfig + connection otelrabbitmq.Connection } func (p *publisher) Publish(ctx context.Context, message Message) error { - err := p.reconnectIfUnhealthy() + err := p.connection.ReconnectIfUnhealthy() if err != nil { return err } @@ -73,7 +65,7 @@ func (p *publisher) Publish(ctx context.Context, message Message) error { // This could later be optimized to re-use channels which avoids repeated network calls to create and close them. // Concurrency-control through something like a resource pool would be necessary since aqmp channels are not thread safe. channel, err := p.connection.Channel() - defer func(channel Channel) { + defer func(channel otelrabbitmq.Channel) { if channel != nil { err2 := channel.Close() if err2 != nil { @@ -125,71 +117,9 @@ func (p *publisher) Publish(ctx context.Context, message Message) error { } } -func (p *publisher) reconnectIfUnhealthy() error { - p.connLock.Lock() - defer p.connLock.Unlock() - - hasConnectionError := false - select { - case err := <-p.connectionErrors: - hasConnectionError = true - p.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err)) - default: - break - } - - if hasConnectionError || !p.isConnected() { - if p.isConnected() { - err := p.connection.Close() - if err != nil { - p.logger.Warn("Error closing unhealthy connection", zap.Error(err)) - } - } - - if err := p.connect(); err != nil { - return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err) - } - p.logger.Info("Successfully restored unhealthy rabbitmq connection") - } - - return nil -} - -func (p *publisher) connect() error { - p.logger.Debug("Connecting to rabbitmq") - - properties := amqp.Table{} - properties.SetClientConnectionName(p.config.ConnectionName) - - connection, err := p.client.DialConfig(p.config.URL, amqp.Config{ - SASL: []amqp.Authentication{p.config.Auth}, - Vhost: p.config.Vhost, - Heartbeat: p.config.Heartbeat, - Dial: amqp.DefaultDial(p.config.ConnectionTimeout), - Properties: properties, - TLSClientConfig: p.config.TLS, - }) - if connection != nil { - p.connection = connection - } - if err != nil { - return err - } - - // Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors. - // Also re-create this channel each time because apparently the amqp library can close it - p.connectionErrors = make(chan *amqp.Error, 1) - p.connection.NotifyClose(p.connectionErrors) - return nil -} - func (p *publisher) Close() error { - if p.isConnected() { - return p.connection.Close() + if p.connection == nil { + return nil } - return nil -} - -func (p *publisher) isConnected() bool { - return p.connection != nil && !p.connection.IsClosed() + return p.connection.Close() } diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher_test.go b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go index 5d8b63693e48..7eddda97a99d 100644 --- a/exporter/rabbitmqexporter/internal/publisher/publisher_test.go +++ b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq" ) const ( @@ -23,14 +25,18 @@ const ( ) func TestConnectAndClose(t *testing.T) { - client := mockClient{} connection := mockConnection{} + client := mockClient{ + conn: &connection, + } dialConfig := DialConfig{ - URL: connectURL, + DialConfig: rabbitmq.DialConfig{ + URL: connectURL, + }, } // Start the connection successfully - client.On("DialConfig", connectURL, mock.Anything).Return(&connection, nil) + client.On("DialConfig", mock.Anything).Return(&connection, nil) connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) @@ -39,7 +45,6 @@ func TestConnectAndClose(t *testing.T) { client.AssertExpectations(t) // Close the connection - connection.On("IsClosed").Return(false) connection.On("Close").Return(nil) err = publisher.Close() @@ -49,12 +54,18 @@ func TestConnectAndClose(t *testing.T) { } func TestConnectionErrorAndClose(t *testing.T) { - client := mockClient{} + connection := mockConnection{} + client := mockClient{ + conn: &connection, + } dialConfig := DialConfig{ - URL: connectURL, + DialConfig: rabbitmq.DialConfig{ + URL: connectURL, + }, } - client.On("DialConfig", connectURL, mock.Anything).Return(nil, errors.New("simulated connection error")) + connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) + client.On("DialConfig", mock.Anything).Return(nil, errors.New("simulated connection error")) publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) assert.EqualError(t, err, "simulated connection error") @@ -82,7 +93,6 @@ func TestPublishAckedWithinTimeout(t *testing.T) { func TestPublishNackedWithinTimeout(t *testing.T) { client, connection, channel, confirmation := setupMocksForSuccessfulPublish() - resetCall(confirmation.ExpectedCalls, "Acked", t) confirmation.On("Acked").Return(false) @@ -163,10 +173,10 @@ func TestRestoreUnhealthyConnectionDuringPublish(t *testing.T) { err = publisher.Publish(context.Background(), makePublishMessage()) require.NoError(t, err) - client.AssertNumberOfCalls(t, "DialConfig", 2) // Connected twice + connection.AssertNumberOfCalls(t, "ReconnectIfUnhealthy", 1) client.AssertExpectations(t) + resetCall(connection.ExpectedCalls, "Close", t) connection.AssertExpectations(t) - connection.AssertNumberOfCalls(t, "Close", 1) channel.AssertExpectations(t) confirmation.AssertExpectations(t) } @@ -178,12 +188,9 @@ func TestRestoreClosedConnectionDuringPublish(t *testing.T) { publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) require.NoError(t, err) - resetCall(connection.ExpectedCalls, "IsClosed", t) - connection.On("IsClosed").Return(true) - err = publisher.Publish(context.Background(), makePublishMessage()) require.NoError(t, err) - client.AssertNumberOfCalls(t, "DialConfig", 2) // Connected twice + client.AssertNumberOfCalls(t, "DialConfig", 1) client.AssertExpectations(t) connection.AssertExpectations(t) channel.AssertExpectations(t) @@ -197,15 +204,13 @@ func TestFailRestoreConnectionDuringPublishing(t *testing.T) { require.NoError(t, err) client.AssertNumberOfCalls(t, "DialConfig", 1) - resetCall(connection.ExpectedCalls, "IsClosed", t) connection.On("IsClosed").Return(true) resetCall(client.ExpectedCalls, "DialConfig", t) client.On("DialConfig", connectURL, mock.Anything).Return(nil, errors.New("simulated connection error")) - err = publisher.Publish(context.Background(), makePublishMessage()) - assert.EqualError(t, err, "failed attempt at restoring unhealthy connection\nsimulated connection error") - client.AssertNumberOfCalls(t, "DialConfig", 2) // Tried reconnecting + _ = publisher.Publish(context.Background(), makePublishMessage()) + client.AssertNumberOfCalls(t, "DialConfig", 1) } func TestErrCreatingChannel(t *testing.T) { @@ -237,7 +242,6 @@ func TestErrSettingChannelConfirmMode(t *testing.T) { func TestErrPublishing(t *testing.T) { client, connection, _, _ := setupMocksForSuccessfulPublish() - // resetCall(channel.ExpectedCalls, "PublishWithDeferredConfirmWithContext") doesn't work so need to recreate the mock channel := mockChannel{} channel.On("Confirm", false).Return(nil) channel.On("PublishWithDeferredConfirmWithContext", mock.Anything, exchange, routingKey, true, false, mock.MatchedBy(isPersistentDeliverMode)).Return(nil, errors.New("simulated error publishing")) @@ -253,15 +257,17 @@ func TestErrPublishing(t *testing.T) { } func setupMocksForSuccessfulPublish() (*mockClient, *mockConnection, *mockChannel, *mockDeferredConfirmation) { - client := mockClient{} connection := mockConnection{} + client := mockClient{ + conn: &connection, + } channel := mockChannel{} confirmation := mockDeferredConfirmation{} - client.On("DialConfig", mock.Anything, mock.Anything).Return(&connection, nil) + client.On("DialConfig", mock.Anything).Return(&connection, nil) + connection.On("ReconnectIfUnhealthy").Return(nil) connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) connection.On("Channel").Return(&channel, nil) - connection.On("IsClosed").Return(false) channel.On("Confirm", false).Return(nil) channel.On("PublishWithDeferredConfirmWithContext", mock.Anything, exchange, routingKey, true, false, mock.MatchedBy(isPersistentDeliverMode)).Return(&confirmation, nil) @@ -287,18 +293,21 @@ func resetCall(calls []*mock.Call, methodName string, t *testing.T) { return } } + t.Errorf("Faild to reset method %s", methodName) t.FailNow() } type mockClient struct { mock.Mock + conn *mockConnection } -func (m *mockClient) DialConfig(url string, config amqp.Config) (Connection, error) { - args := m.Called(url, config) +func (m *mockClient) DialConfig(config rabbitmq.DialConfig) (rabbitmq.Connection, error) { + args := m.Called(config) + m.conn.NotifyClose(make(chan *amqp.Error, 1)) if connection := args.Get(0); connection != nil { - return connection.(Connection), args.Error(1) + return connection.(rabbitmq.Connection), args.Error(1) } return nil, args.Error(1) } @@ -307,15 +316,20 @@ type mockConnection struct { mock.Mock } +func (m *mockConnection) ReconnectIfUnhealthy() error { + args := m.Called() + return args.Error(0) +} + func (m *mockConnection) IsClosed() bool { args := m.Called() return args.Bool(0) } -func (m *mockConnection) Channel() (Channel, error) { +func (m *mockConnection) Channel() (rabbitmq.Channel, error) { args := m.Called() if channel := args.Get(0); channel != nil { - return channel.(Channel), args.Error(1) + return channel.(rabbitmq.Channel), args.Error(1) } return nil, args.Error(1) } @@ -339,10 +353,10 @@ func (m *mockChannel) Confirm(noWait bool) error { return args.Error(0) } -func (m *mockChannel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { +func (m *mockChannel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (rabbitmq.DeferredConfirmation, error) { args := m.Called(ctx, exchange, key, mandatory, immediate, msg) if confirmation := args.Get(0); confirmation != nil { - return confirmation.(DeferredConfirmation), args.Error(1) + return confirmation.(rabbitmq.DeferredConfirmation), args.Error(1) } return nil, args.Error(1) } @@ -381,7 +395,9 @@ func makePublishMessage() Message { func makeDialConfig() DialConfig { return DialConfig{ - URL: connectURL, + DialConfig: rabbitmq.DialConfig{ + URL: connectURL, + }, PublishConfirmationTimeout: time.Millisecond * 20, Durable: true, } diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter.go b/exporter/rabbitmqexporter/rabbitmq_exporter.go index 40d068e3f076..d6f26d665d57 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq" ) type rabbitmqExporter struct { @@ -50,17 +51,19 @@ func (e *rabbitmqExporter) start(ctx context.Context, host component.Host) error e.marshaler = m dialConfig := publisher.DialConfig{ - URL: e.config.Connection.Endpoint, - Vhost: e.config.Connection.VHost, - Auth: &amqp.PlainAuth{ - Username: e.config.Connection.Auth.Plain.Username, - Password: e.config.Connection.Auth.Plain.Password, - }, Durable: e.config.Durable, - ConnectionName: e.connectionName, - ConnectionTimeout: e.config.Connection.ConnectionTimeout, - Heartbeat: e.config.Connection.Heartbeat, PublishConfirmationTimeout: e.config.Connection.PublishConfirmationTimeout, + DialConfig: rabbitmq.DialConfig{ + URL: e.config.Connection.Endpoint, + Vhost: e.config.Connection.VHost, + Auth: &amqp.PlainAuth{ + Username: e.config.Connection.Auth.Plain.Username, + Password: e.config.Connection.Auth.Plain.Password, + }, + ConnectionName: e.connectionName, + ConnectionTimeout: e.config.Connection.ConnectionTimeout, + Heartbeat: e.config.Connection.Heartbeat, + }, } tlsConfig, err := e.tlsFactory(ctx) diff --git a/internal/rabbitmq/Makefile b/internal/rabbitmq/Makefile new file mode 100644 index 000000000000..ded7a36092dc --- /dev/null +++ b/internal/rabbitmq/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/internal/rabbitmq/client.go b/internal/rabbitmq/client.go new file mode 100644 index 000000000000..9fa868da3397 --- /dev/null +++ b/internal/rabbitmq/client.go @@ -0,0 +1,201 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq" + +import ( + "context" + "crypto/tls" + "errors" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "go.uber.org/zap" +) + +type AmqpClient interface { + DialConfig(config DialConfig) (Connection, error) +} + +type Connection interface { + ReconnectIfUnhealthy() error + IsClosed() bool + Channel() (Channel, error) + NotifyClose(receiver chan *amqp.Error) chan *amqp.Error + Close() error +} + +type Channel interface { + Confirm(noWait bool) error + PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) + IsClosed() bool + Close() error +} + +type DeferredConfirmation interface { + Done() <-chan struct{} + Acked() bool +} + +type connectionHolder struct { + url string + config amqp.Config + connection *amqp.Connection + logger *zap.Logger + connLock *sync.Mutex + connectionErrors chan *amqp.Error +} + +type channelHolder struct { + channel *amqp.Channel +} + +type deferredConfirmationHolder struct { + confirmation *amqp.DeferredConfirmation +} + +type DialConfig struct { + URL string + Vhost string + Auth amqp.Authentication + ConnectionTimeout time.Duration + Heartbeat time.Duration + TLS *tls.Config + ConnectionName string +} + +func NewAmqpClient(logger *zap.Logger) AmqpClient { + return &client{logger: logger} +} + +type client struct { + logger *zap.Logger +} + +func (c *client) DialConfig(config DialConfig) (Connection, error) { + properties := amqp.Table{} + properties.SetClientConnectionName(config.ConnectionName) + ch := &connectionHolder{ + url: config.URL, + config: amqp.Config{ + SASL: []amqp.Authentication{config.Auth}, + Vhost: config.Vhost, + TLSClientConfig: config.TLS, + Heartbeat: config.Heartbeat, + Dial: amqp.DefaultDial(config.ConnectionTimeout), + Properties: properties, + }, + logger: c.logger, + connLock: &sync.Mutex{}, + connectionErrors: make(chan *amqp.Error, 1), + } + + ch.connLock.Lock() + defer ch.connLock.Unlock() + + return ch, ch.connect() +} + +func (c *connectionHolder) ReconnectIfUnhealthy() error { + c.connLock.Lock() + defer c.connLock.Unlock() + + hasConnectionError := false + select { + case err := <-c.connectionErrors: + hasConnectionError = true + c.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err)) + default: + break + } + + if hasConnectionError || !c.isConnected() { + if c.isConnected() { + err := c.connection.Close() + if err != nil { + c.logger.Warn("Error closing unhealthy connection", zap.Error(err)) + } + } + + if err := c.connect(); err != nil { + return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err) + } + c.logger.Info("Successfully restored unhealthy rabbitmq connection") + } + + return nil +} + +func (c *connectionHolder) connect() error { + c.logger.Debug("Connecting to rabbitmq") + + connection, err := amqp.DialConfig(c.url, c.config) + if connection != nil { + c.connection = connection + } + if err != nil { + return err + } + + // Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors. + // Also re-create this channel each time because apparently the amqp library can close it + c.connectionErrors = make(chan *amqp.Error, 1) + c.connection.NotifyClose(c.connectionErrors) + return nil +} + +func (c *connectionHolder) Close() error { + if c.isConnected() { + return c.connection.Close() + } + return nil +} + +func (c *connectionHolder) isConnected() bool { + return c.connection != nil && !c.IsClosed() +} + +func (c *connectionHolder) Channel() (Channel, error) { + channel, err := c.connection.Channel() + if err != nil { + return nil, err + } + return &channelHolder{channel: channel}, nil +} + +func (c *connectionHolder) IsClosed() bool { + return c.connection.IsClosed() +} + +func (c *connectionHolder) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { + return c.connection.NotifyClose(receiver) +} + +func (c *channelHolder) Confirm(noWait bool) error { + return c.channel.Confirm(noWait) +} + +func (c *channelHolder) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { + confirmation, err := c.channel.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) + if err != nil { + return nil, err + } + return &deferredConfirmationHolder{confirmation: confirmation}, nil +} + +func (c *channelHolder) IsClosed() bool { + return c.channel.IsClosed() +} + +func (c *channelHolder) Close() error { + return c.channel.Close() +} + +func (d *deferredConfirmationHolder) Done() <-chan struct{} { + return d.confirmation.Done() +} + +func (d *deferredConfirmationHolder) Acked() bool { + return d.confirmation.Acked() +} diff --git a/internal/rabbitmq/client_test.go b/internal/rabbitmq/client_test.go new file mode 100644 index 000000000000..3fbff7782bcb --- /dev/null +++ b/internal/rabbitmq/client_test.go @@ -0,0 +1,181 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "context" + "crypto/tls" + "sync" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +type MockConnection struct { + mock.Mock +} + +func (m *MockConnection) ReconnectIfUnhealthy() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockConnection) IsClosed() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *MockConnection) Channel() (Channel, error) { + args := m.Called() + return args.Get(0).(Channel), args.Error(1) +} + +func (m *MockConnection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { + args := m.Called(receiver) + return args.Get(0).(chan *amqp.Error) +} + +func (m *MockConnection) Close() error { + args := m.Called() + return args.Error(0) +} + +type MockChannel struct { + mock.Mock +} + +func (m *MockChannel) Confirm(noWait bool) error { + args := m.Called(noWait) + return args.Error(0) +} + +func (m *MockChannel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { + args := m.Called(ctx, exchange, key, mandatory, immediate, msg) + return args.Get(0).(DeferredConfirmation), args.Error(1) +} + +func (m *MockChannel) IsClosed() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *MockChannel) Close() error { + args := m.Called() + return args.Error(0) +} + +type MockDeferredConfirmation struct { + mock.Mock +} + +func (m *MockDeferredConfirmation) Done() <-chan struct{} { + args := m.Called() + return args.Get(0).(chan struct{}) +} + +func (m *MockDeferredConfirmation) Acked() bool { + args := m.Called() + return args.Bool(0) +} + +func TestDialConfig(t *testing.T) { + logger := zap.NewNop() + client := NewAmqpClient(logger) + + config := DialConfig{ + URL: "amqp://guest:guest@localhost:5672/", + Vhost: "/", + Auth: &amqp.PlainAuth{Username: "guest", Password: "guest"}, + ConnectionTimeout: 10 * time.Second, + Heartbeat: 10 * time.Second, + TLS: &tls.Config{}, + ConnectionName: "test-connection", + } + + conn, err := client.DialConfig(config) + assert.ErrorContains(t, err, "connect: connection refused") + assert.NotNil(t, conn) +} + +func TestReconnectIfUnhealthy(t *testing.T) { + connection := &connectionHolder{ + logger: zap.NewNop(), + connLock: &sync.Mutex{}, + connectionErrors: make(chan *amqp.Error, 1), + url: "amqp://guest:guest@localhost:5672/", + config: amqp.Config{ + Vhost: "/", + }, + } + + connection.connectionErrors <- &amqp.Error{ + Code: 0, + Reason: "mock error", + Server: false, + Recover: false, + } + + err := connection.ReconnectIfUnhealthy() + assert.ErrorContains(t, err, "connect: connection refused") +} + +func TestIsConnected(t *testing.T) { + logger := zap.NewNop() + connection := &connectionHolder{ + logger: logger, + connLock: &sync.Mutex{}, + } + + assert.False(t, connection.isConnected()) +} + +func TestChannel(t *testing.T) { + mockConn := new(MockConnection) + mockChan := new(MockChannel) + + mockConn.On("Channel").Return(mockChan, nil) + mockChan.On("Confirm", false).Return(nil) + mockChan.On("PublishWithDeferredConfirmWithContext", mock.Anything, "exchange", "key", false, false, mock.Anything).Return(new(MockDeferredConfirmation), nil) + mockChan.On("IsClosed").Return(false) + mockChan.On("Close").Return(nil) + + channel, err := mockConn.Channel() + assert.NoError(t, err) + assert.NotNil(t, channel) + + err = channel.Confirm(false) + assert.NoError(t, err) + + ctx := context.Background() + deferredConf, err := channel.PublishWithDeferredConfirmWithContext(ctx, "exchange", "key", false, false, amqp.Publishing{}) + assert.NoError(t, err) + assert.NotNil(t, deferredConf) + + assert.False(t, channel.IsClosed()) + + err = channel.Close() + assert.NoError(t, err) + + mockConn.AssertExpectations(t) + mockChan.AssertExpectations(t) +} + +func TestPublishWithDeferredConfirmWithContext(t *testing.T) { + mockChan := new(MockChannel) + mockDefConf := new(MockDeferredConfirmation) + ctx := context.Background() + + mockChan.On("PublishWithDeferredConfirmWithContext", ctx, "exchange", "key", false, false, mock.Anything).Return(mockDefConf, nil) + + deferredConf, err := mockChan.PublishWithDeferredConfirmWithContext(ctx, "exchange", "key", false, false, amqp.Publishing{}) + assert.NoError(t, err) + assert.NotNil(t, deferredConf) + + mockChan.AssertExpectations(t) + mockDefConf.AssertExpectations(t) +} diff --git a/internal/rabbitmq/go.mod b/internal/rabbitmq/go.mod new file mode 100644 index 000000000000..c68e23f7931e --- /dev/null +++ b/internal/rabbitmq/go.mod @@ -0,0 +1,17 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq + +go 1.22.0 + +require ( + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/stretchr/testify v1.8.1 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/internal/rabbitmq/go.sum b/internal/rabbitmq/go.sum new file mode 100644 index 000000000000..380da57625a3 --- /dev/null +++ b/internal/rabbitmq/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/rabbitmq/metadata.yaml b/internal/rabbitmq/metadata.yaml new file mode 100644 index 000000000000..416b20ab1932 --- /dev/null +++ b/internal/rabbitmq/metadata.yaml @@ -0,0 +1,3 @@ +status: + codeowners: + active: [swar8080, atoulme] diff --git a/versions.yaml b/versions.yaml index 3b4f9650861e..2b5fc633a95a 100644 --- a/versions.yaml +++ b/versions.yaml @@ -132,6 +132,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet - github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders - github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq - github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent - github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk