Skip to content

Conversation

@conorevans
Copy link
Contributor

@conorevans conorevans commented Dec 14, 2021

Signed-off-by: Conor Evans coevans@tcd.ie

Closes #110

Motivation:

In cases where the FluentHost can resolve to one of many IPs (e.g. a Consul service address), periodic reconnection is a desirable feature. In a Consul example, a member can be marked unhealthy and leave the service pool, or new members can be added to the service pool. While ForceStopAsyncSend is welcomed in that it prevents Docker containers hanging, but it doesn't prevent log loss, and a single application with a log spike can still overwhelm a fluentd worker. If periodic reconnection were in place, FluentHost could resolve to a new address and continue logging.

Implementation:

I initialize considered spawning a separate goroutine reconnect with a Ticker, but thought that closing the goroutine reliably could be difficult, and that if the AsyncReconnectInterval were too long (even 5s would be a pretty long time to come back around to a new time to select the stopRunning channel and end the goroutine). So I piggybacked on run and used a lastReconnectTime implementation instead. I lock/unlock the muconn around the call as directed by connectWithRetry (no defer since we want it unlocked straight away for writeWithRetry soon thereafter.

Considerations:
I don't see any performance tests that can be benchmarked, but I tested two Docker applications side-by-side, one with the fluentd logging driver, the other using fluent.Post based on my fork, and the applications were logging consistently (w.r.t volume).

I went with ms as the unit for AsyncReconnectInterval because it's consistent with other config attributes (e.g. RetryWait). Since this library is expected to support go1.13, go1.16, and go1.17 per the CI config, I couldn't use the UnixMilli method from go1.17 -- at some point in the future the comments can be removed and the code updated.

Testing:
Use a FluentHost that can resolve to one or more IPs (e.g. a Consul service address). Observe that it is possible for the FluentHost to change now that reconnection occurs.

Signed-off-by: Conor Evans <coevans@tcd.ie>
package fluent

const Version = "1.4.0"
const Version = "1.9.0"
Copy link
Contributor Author

@conorevans conorevans Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is evidently outdated but I figured that it can't be removed for backwards compatibility, in which case it can be updated at least.

fluent/fluent.go Outdated

// time (unix milliseconds) at which the most recent connection to fluentd
// address was established.
latestReconnectTime int64
Copy link

@featherbread featherbread Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be time.Time instead of a Unix millisecond count, since that provides a monotonic clock for the later comparison operations. The check would be something like:

if time.Since(f.latestReconnectTime) > f.AsyncReconnectInterval*time.Millisecond {
    // reconnect
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied 👍 (just had to transform the int64 AsyncReconnectInterval to a time.Duration). It makes the code cleaner (no need for comments) and uses time native methods, thanks for the suggestion.

On the first tick of the for loop in run now (if AsyncReconnectInterval is set), the first connection will now be made by the code in the loop, whereas it previously did so in writeWithRetry. This is because of the time.Time "zero" value. We could avoid this by doing !f.latestReconnectTime.IsZero() but I don't see the need for this check, as it's the same connection call made, just in a different place, and the condition is only relevant the first run, and would somewhat bloat the code, in my opinion. Just wanted to call it out in case the maintainers here prefer that I do that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, was it failing to compile without the explicit time.Duration(…) conversion? I tried something like that in the playground and it seemed to work.

Copy link
Contributor Author

@conorevans conorevans Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's because the reconnectInterval there is a const (an unsigned integer constant). When it's an attribute of a struct, like here, it needs to be cast: https://go.dev/play/p/WQP-uCAAh1h

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, you're absolutely right, I should have recognized that the difference was the untyped constant.

…nt64 comparisons that require comments

Signed-off-by: Conor Evans <coevans@tcd.ie>
@conorevans
Copy link
Contributor Author

FYI @fujimotos @ashie @kenhys (I know you guys only took over recently so just tagging in case procedure hasn't been set in stone yet)

@fujimotos
Copy link
Member

@conorevans I pulled your PR this morning, and created a test program that
makes use of the new option (AsyncReconnectInterval).

It seems that something is wrong with your implementation. As I see, it leaves
a stall connection on every refresh:

$ netstat -pnt
...
tcp        0      0 127.0.0.1:47576         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47632         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47650         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47580         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47600         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47618         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47608         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47664         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47628         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47606         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47616         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
tcp        0      0 127.0.0.1:47590         127.0.0.1:24224         ESTABLISHED 42456/./async-test        
...

@fujimotos
Copy link
Member

Feature wise, I have no particular opposition to this proposal. So once the above
issue gets resolved, I'll proceed and merge this PR.

@conorevans
Copy link
Contributor Author

Hi @fujimotos thanks and good spot! I have fixed that now by closing the connection before attempting a reconnect.

~ netstat -tcp
...
tcp4       0      0  ...    ip.24224       SYN_SENT
tcp4       0      0  ...    ip.24224       FIN_WAIT_1

Signed-off-by: Conor Evans <coevans@tcd.ie>
…inally it was int64 to be consistent with time package Unix() method return values)

Signed-off-by: Conor Evans <coevans@tcd.ie>
fujimotos pushed a commit that referenced this pull request Dec 18, 2021
This adds a new option "AsyncReconnectInterval" that defines the
interval at which an async connection to FluentdHost is refreshed.

This option is useful when FluentdHost is backed by a service pool,
where a set of healthy Fluentd workers are managed dynamically
behind a host name.

Signed-off-by: Conor Evans <coevans@tcd.ie>
Reviewed-by: Alex Hamlin <github@alexhamlin.co>
Signed-off-by: Fujimoto Seiji <fujimoto@ceptord.net>
@fujimotos
Copy link
Member

thanks and good spot! I have fixed that now by closing the connection before attempting a reconnect.

@conorevans Thanks! I can confirm that reconnections work fine
without any issues.

Pushed with some more Git metadata:

  • Squash the commits into one (to make it easier to do git bisect)
  • Describe a few more details in commit message (taken from the PR description).
  • Add acknowledgement to @ahamlinman

@fujimotos fujimotos closed this Dec 18, 2021
@fujimotos
Copy link
Member

Merged via 1c05506

@conorevans
Copy link
Contributor Author

Thanks @fujimotos ! Can you tag 1.9.0 when you have a chance?

@fujimotos
Copy link
Member

@conorevans I released v1.9.0. You can view the changelog here:

https://github.com/fluent/fluent-logger-golang/blob/master/CHANGELOG.md#190

@conorevans
Copy link
Contributor Author

Thank you @fujimotos I really appreciate the time and effort you put in 🙌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature: periodically reconnect to fluentd-address

3 participants