Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ When Async is enabled, if this is callback is provided, it will be called on eve
takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
delivery of the message was unsuccessful.

### AsyncReconnectInterval
When async is enabled, this option defines the interval (ms) at which the connection
to the fluentd-address is re-established. This option is useful if the address
may resolve to one or more IP addresses, e.g. a Consul service address.

### SubSecondPrecision

Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.
Expand Down
20 changes: 20 additions & 0 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type Config struct {
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`

// AsyncReconnectInterval defines the interval (ms) at which the connection
// to the fluentd-address is re-established. This option is useful if the address
// may resolve to one or more IP addresses, e.g. a Consul service address.
AsyncReconnectInterval int64 `json:"async_reconnect_interval"`

// Sub-second precision timestamps are only possible for those using fluentd
// v0.14+ and serializing their messages with msgpack.
SubSecondPrecision bool `json:"sub_second_precision"`
Expand Down Expand Up @@ -108,6 +113,9 @@ type Fluent struct {
closed bool
wg sync.WaitGroup

// time at which the most recent connection to fluentd-address was established.
latestReconnectTime time.Time

muconn sync.RWMutex
conn net.Conn
}
Expand Down Expand Up @@ -447,6 +455,10 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
}

if err == nil {
f.latestReconnectTime = time.Now()
}

return err
}

Expand Down Expand Up @@ -508,6 +520,14 @@ func (f *Fluent) run(ctx context.Context) {
return
}

if f.AsyncReconnectInterval > 0 {
if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond {
f.muconn.Lock()
f.connectWithRetry(ctx)
f.muconn.Unlock()
}
}

err := f.writeWithRetry(ctx, entry)
if err != nil && err != errIsClosing {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
Expand Down
2 changes: 1 addition & 1 deletion fluent/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package fluent

const Version = "1.4.0"
const Version = "1.9.0"
Copy link
Contributor Author

@conorevans conorevans Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is evidently outdated but I figured that it can't be removed for backwards compatibility, in which case it can be updated at least.