-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory leak in Filebeat pipeline acker #12063
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -133,25 +133,31 @@ func (a *gapCountACK) ackLoop() { | |
|
||
acks, drop := a.acks, a.drop | ||
closing := false | ||
empty := false | ||
|
||
for { | ||
select { | ||
case <-a.done: | ||
closing = true | ||
a.done = nil | ||
if a.events.Load() == 0 { | ||
// stop worker, if all events accounted for have been ACKed | ||
return | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is tricky and potentially racy. The Checking the client code, I found the addEvent can indeed be called after the Might be ok, though. The ACKer also ignores pending ACKs if shutdown is forced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a comment to explain the situation here |
||
|
||
case <-a.pipeline.ackDone: | ||
return | ||
|
||
case n := <-acks: | ||
empty := a.handleACK(n) | ||
empty = a.handleACK(n) | ||
if empty && closing && a.events.Load() == 0 { | ||
// stop worker, iff all events accounted for have been ACKed | ||
// stop worker, if all events accounted for have been ACKed | ||
exekias marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
|
||
case <-drop: | ||
// TODO: accumulate multiple drop events + flush count with timer | ||
a.events.Sub(1) | ||
a.fn(1, 0) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the empty variable is only used in the
case n := <-acks
block. As the state is changed asynchronously the 'empty' variable is MUST only be used after a call the handleACK and ifclosing
is true. Let's reduce the scope whereempty
is available to the block where it is actually initialised and used.