Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ When Async is enabled, if this is callback is provided, it will be called on eve
takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
delivery of the message was unsuccessful.

### AsyncReconnectInterval
When async is enabled, this option defines the interval (ms) at which the connection
to the fluentd-address is re-established. This option is useful if the address
may resolve to one or more IP addresses, e.g. a Consul service address.

### SubSecondPrecision

Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.
Expand Down
27 changes: 27 additions & 0 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type Config struct {
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`

// AsyncReconnectInterval defines the interval (ms) at which the connection
// to the fluentd-address is re-established. This option is useful if the address
// may resolve to one or more IP addresses, e.g. a Consul service address.
AsyncReconnectInterval int64 `json:"async_reconnect_interval"`

// Sub-second precision timestamps are only possible for those using fluentd
// v0.14+ and serializing their messages with msgpack.
SubSecondPrecision bool `json:"sub_second_precision"`
Expand Down Expand Up @@ -108,6 +113,10 @@ type Fluent struct {
closed bool
wg sync.WaitGroup

// time (unix milliseconds) at which the most recent connection to fluentd
// address was established.
latestReconnectTime int64
Copy link

@featherbread featherbread Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be time.Time instead of a Unix millisecond count, since that provides a monotonic clock for the later comparison operations. The check would be something like:

if time.Since(f.latestReconnectTime) > f.AsyncReconnectInterval*time.Millisecond {
    // reconnect
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied 👍 (just had to transform the int64 AsyncReconnectInterval to a time.Duration). It makes the code cleaner (no need for comments) and uses time native methods, thanks for the suggestion.

On the first tick of the for loop in run now (if AsyncReconnectInterval is set), the first connection will now be made by the code in the loop, whereas it previously did so in writeWithRetry. This is because of the time.Time "zero" value. We could avoid this by doing !f.latestReconnectTime.IsZero() but I don't see the need for this check, as it's the same connection call made, just in a different place, and the condition is only relevant the first run, and would somewhat bloat the code, in my opinion. Just wanted to call it out in case the maintainers here prefer that I do that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, was it failing to compile without the explicit time.Duration(…) conversion? I tried something like that in the playground and it seemed to work.

Copy link
Contributor Author

@conorevans conorevans Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's because the reconnectInterval there is a const (an unsigned integer constant). When it's an attribute of a struct, like here, it needs to be cast: https://go.dev/play/p/WQP-uCAAh1h

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, you're absolutely right, I should have recognized that the difference was the untyped constant.


muconn sync.RWMutex
conn net.Conn
}
Expand Down Expand Up @@ -447,6 +456,13 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
}

if err == nil {
// time.Now().UnixMilli() is possible but only introduced in Go 1.17
// AsyncReconnectInterval is defined in ms in line with other options
// here, so we want to use ms throughout.
f.latestReconnectTime = time.Now().Unix() * int64(time.Millisecond)
}

return err
}

Expand Down Expand Up @@ -508,6 +524,17 @@ func (f *Fluent) run(ctx context.Context) {
return
}

if f.AsyncReconnectInterval > 0 {
// time.Now().UnixMilli() is possible but only introduced in Go 1.17.
// AsyncReconnectInterval is defined in ms in line with other options
// here, so we want to use ms throughout.
if now := time.Now().Unix() * int64(time.Millisecond); now > (f.latestReconnectTime + f.AsyncReconnectInterval) {
f.muconn.Lock()
f.connectWithRetry(ctx)
f.muconn.Unlock()
}
}

err := f.writeWithRetry(ctx, entry)
if err != nil && err != errIsClosing {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
Expand Down
2 changes: 1 addition & 1 deletion fluent/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package fluent

const Version = "1.4.0"
const Version = "1.9.0"
Copy link
Contributor Author

@conorevans conorevans Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is evidently outdated but I figured that it can't be removed for backwards compatibility, in which case it can be updated at least.