-
Notifications
You must be signed in to change notification settings - Fork 105
Feature: periodically reconnect to fluentd-address #111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Conor Evans <coevans@tcd.ie>
| package fluent | ||
|
|
||
| const Version = "1.4.0" | ||
| const Version = "1.9.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is evidently outdated but I figured that it can't be removed for backwards compatibility, in which case it can be updated at least.
fluent/fluent.go
Outdated
|
|
||
| // time (unix milliseconds) at which the most recent connection to fluentd | ||
| // address was established. | ||
| latestReconnectTime int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be time.Time instead of a Unix millisecond count, since that provides a monotonic clock for the later comparison operations. The check would be something like:
if time.Since(f.latestReconnectTime) > f.AsyncReconnectInterval*time.Millisecond {
// reconnect
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied 👍 (just had to transform the int64 AsyncReconnectInterval to a time.Duration). It makes the code cleaner (no need for comments) and uses time native methods, thanks for the suggestion.
On the first tick of the for loop in run now (if AsyncReconnectInterval is set), the first connection will now be made by the code in the loop, whereas it previously did so in writeWithRetry. This is because of the time.Time "zero" value. We could avoid this by doing !f.latestReconnectTime.IsZero() but I don't see the need for this check, as it's the same connection call made, just in a different place, and the condition is only relevant the first run, and would somewhat bloat the code, in my opinion. Just wanted to call it out in case the maintainers here prefer that I do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, was it failing to compile without the explicit time.Duration(…) conversion? I tried something like that in the playground and it seemed to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's because the reconnectInterval there is a const (an unsigned integer constant). When it's an attribute of a struct, like here, it needs to be cast: https://go.dev/play/p/WQP-uCAAh1h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, you're absolutely right, I should have recognized that the difference was the untyped constant.
…nt64 comparisons that require comments Signed-off-by: Conor Evans <coevans@tcd.ie>
|
FYI @fujimotos @ashie @kenhys (I know you guys only took over recently so just tagging in case procedure hasn't been set in stone yet) |
|
@conorevans I pulled your PR this morning, and created a test program that It seems that something is wrong with your implementation. As I see, it leaves $ netstat -pnt
...
tcp 0 0 127.0.0.1:47576 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47632 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47650 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47580 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47600 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47618 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47608 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47664 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47628 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47606 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47616 127.0.0.1:24224 ESTABLISHED 42456/./async-test
tcp 0 0 127.0.0.1:47590 127.0.0.1:24224 ESTABLISHED 42456/./async-test
... |
|
Feature wise, I have no particular opposition to this proposal. So once the above |
|
Hi @fujimotos thanks and good spot! I have fixed that now by closing the connection before attempting a reconnect. ➜ ~ netstat -tcp
...
tcp4 0 0 ... ip.24224 SYN_SENT
tcp4 0 0 ... ip.24224 FIN_WAIT_1 |
Signed-off-by: Conor Evans <coevans@tcd.ie>
…inally it was int64 to be consistent with time package Unix() method return values) Signed-off-by: Conor Evans <coevans@tcd.ie>
This adds a new option "AsyncReconnectInterval" that defines the interval at which an async connection to FluentdHost is refreshed. This option is useful when FluentdHost is backed by a service pool, where a set of healthy Fluentd workers are managed dynamically behind a host name. Signed-off-by: Conor Evans <coevans@tcd.ie> Reviewed-by: Alex Hamlin <github@alexhamlin.co> Signed-off-by: Fujimoto Seiji <fujimoto@ceptord.net>
@conorevans Thanks! I can confirm that reconnections work fine Pushed with some more Git metadata:
|
|
Merged via 1c05506 |
|
Thanks @fujimotos ! Can you tag 1.9.0 when you have a chance? |
|
@conorevans I released v1.9.0. You can view the changelog here: https://github.com/fluent/fluent-logger-golang/blob/master/CHANGELOG.md#190 |
|
Thank you @fujimotos I really appreciate the time and effort you put in 🙌 |
Signed-off-by: Conor Evans coevans@tcd.ie
Closes #110
Motivation:
In cases where the FluentHost can resolve to one of many IPs (e.g. a Consul service address), periodic reconnection is a desirable feature. In a Consul example, a member can be marked unhealthy and leave the service pool, or new members can be added to the service pool. While
ForceStopAsyncSendis welcomed in that it prevents Docker containers hanging, but it doesn't prevent log loss, and a single application with a log spike can still overwhelm a fluentd worker. If periodic reconnection were in place, FluentHost could resolve to a new address and continue logging.Implementation:
I initialize considered spawning a separate goroutine
reconnectwith a Ticker, but thought that closing the goroutine reliably could be difficult, and that if theAsyncReconnectIntervalwere too long (even 5s would be a pretty long time to come back around to a new time to select the stopRunning channel and end the goroutine). So I piggybacked onrunand used alastReconnectTimeimplementation instead. I lock/unlock the muconn around the call as directed byconnectWithRetry(nodefersince we want it unlocked straight away forwriteWithRetrysoon thereafter.Considerations:
I don't see any performance tests that can be benchmarked, but I tested two Docker applications side-by-side, one with the fluentd logging driver, the other using
fluent.Postbased on my fork, and the applications were logging consistently (w.r.t volume).I went with ms as the unit for
AsyncReconnectIntervalbecause it's consistent with other config attributes (e.g. RetryWait). Since this library is expected to support go1.13, go1.16, and go1.17 per the CI config, I couldn't use theUnixMillimethod from go1.17 -- at some point in the future the comments can be removed and the code updated.Testing:
Use a FluentHost that can resolve to one or more IPs (e.g. a Consul service address). Observe that it is possible for the FluentHost to change now that reconnection occurs.