Skip to content

Commit

Permalink
[exporter/syslog] Fix issue where exporter may hang indefinitely whil…
Browse files Browse the repository at this point in the history
…e dialing (open-telemetry#34393)

We had a report of a collector not shutting down properly. Regardless of
the underlying cause, it seems that the dialer was not being passed a
context, so did not cancel the attempt to dial in a reasonable
timeframe. This PR just passes context through to the dialer.

<img width="385" alt="image"
src="https://github.com/user-attachments/assets/5a7bb39e-6f91-4a46-9ec3-97df030de4c6">
  • Loading branch information
djaglowski authored and pull[bot] committed Oct 9, 2024
1 parent 937d007 commit f812ea5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/syslog-exporter-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: syslogexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where exporter may hang indefinitely while dialing.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34393]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 9 additions & 9 deletions exporter/syslogexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,18 @@ func newLogsExporter(
)
}

func (se *syslogexporter) pushLogsData(_ context.Context, logs plog.Logs) error {
func (se *syslogexporter) pushLogsData(ctx context.Context, logs plog.Logs) error {
batchMessages := se.config.Network == string(confignet.TransportTypeTCP)
var err error
if batchMessages {
err = se.exportBatch(logs)
err = se.exportBatch(ctx, logs)
} else {
err = se.exportNonBatch(logs)
err = se.exportNonBatch(ctx, logs)
}
return err
}

func (se *syslogexporter) exportBatch(logs plog.Logs) error {
func (se *syslogexporter) exportBatch(ctx context.Context, logs plog.Logs) error {
var payload strings.Builder
for i := 0; i < logs.ResourceLogs().Len(); i++ {
resourceLogs := logs.ResourceLogs().At(i)
Expand All @@ -99,21 +99,21 @@ func (se *syslogexporter) exportBatch(logs plog.Logs) error {
}

if payload.Len() > 0 {
sender, err := connect(se.logger, se.config, se.tlsConfig)
sender, err := connect(ctx, se.logger, se.config, se.tlsConfig)
if err != nil {
return consumererror.NewLogs(err, logs)
}
defer sender.close()
err = sender.Write(payload.String())
err = sender.Write(ctx, payload.String())
if err != nil {
return consumererror.NewLogs(err, logs)
}
}
return nil
}

func (se *syslogexporter) exportNonBatch(logs plog.Logs) error {
sender, err := connect(se.logger, se.config, se.tlsConfig)
func (se *syslogexporter) exportNonBatch(ctx context.Context, logs plog.Logs) error {
sender, err := connect(ctx, se.logger, se.config, se.tlsConfig)
if err != nil {
return consumererror.NewLogs(err, logs)
}
Expand All @@ -130,7 +130,7 @@ func (se *syslogexporter) exportNonBatch(logs plog.Logs) error {
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)
formatted := se.formatter.format(logRecord)
err = sender.Write(formatted)
err = sender.Write(ctx, formatted)
if err != nil {
errs = append(errs, err)
droppedLogRecord := droppedScopeLogs.LogRecords().AppendEmpty()
Expand Down
17 changes: 10 additions & 7 deletions exporter/syslogexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter"

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand Down Expand Up @@ -42,7 +43,7 @@ type sender struct {
conn net.Conn
}

func connect(logger *zap.Logger, cfg *Config, tlsConfig *tls.Config) (*sender, error) {
func connect(ctx context.Context, logger *zap.Logger, cfg *Config, tlsConfig *tls.Config) (*sender, error) {
s := &sender{
logger: logger,
network: cfg.Network,
Expand All @@ -54,7 +55,7 @@ func connect(logger *zap.Logger, cfg *Config, tlsConfig *tls.Config) (*sender, e
s.mu.Lock()
defer s.mu.Unlock()

err := s.dial()
err := s.dial(ctx)
if err != nil {
return nil, err
}
Expand All @@ -73,21 +74,23 @@ func (s *sender) close() error {
return nil
}

func (s *sender) dial() error {
func (s *sender) dial(ctx context.Context) error {
if s.conn != nil {
s.conn.Close()
s.conn = nil
}
var err error
if s.tlsConfig != nil && s.network == string(confignet.TransportTypeTCP) {
s.conn, err = tls.Dial(s.network, s.addr, s.tlsConfig)
dialer := tls.Dialer{Config: s.tlsConfig}
s.conn, err = dialer.DialContext(ctx, s.network, s.addr)
} else {
s.conn, err = net.Dial(s.network, s.addr)
dialer := new(net.Dialer)
s.conn, err = dialer.DialContext(ctx, s.network, s.addr)
}
return err
}

func (s *sender) Write(msgStr string) error {
func (s *sender) Write(ctx context.Context, msgStr string) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -96,7 +99,7 @@ func (s *sender) Write(msgStr string) error {
return nil
}
}
if err := s.dial(); err != nil {
if err := s.dial(ctx); err != nil {
return err
}

Expand Down

0 comments on commit f812ea5

Please sign in to comment.