-
Notifications
You must be signed in to change notification settings - Fork 306
Update Elasticsearch Output Plugin to retry bulk #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Stud::try(3.times) do | ||
bulk_response = @current_client.bulk(actions) | ||
if bulk_response["errors"] | ||
failed_actions = bulk_response['statuses'].map.with_index {|x, i| actions[i] unless [200, 201].include?(x)}.compact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reject might be simpler:
failed_actions = actions.reject.with_index {|_,i| [200, 201].include? bulk_response['statuses'][i] }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about reject. I like it!
85a098b
to
7e5583f
Compare
Let's only retry the documents which failed :) |
If we have to improve Stud::Buffer, we can do that, but we should only retry docs that failed. |
@jordansissel do you see a way of accomplishing that without updating Stud::Buffer to first way I see:
what do you think? |
7e5583f
to
51938d2
Compare
Update from discussions: We will take over the whole retry management as to prevent |
bd290de
to
3684a81
Compare
3684a81
to
cfcf68b
Compare
I will make a few inline comments, but I'd like to discuss the |
@@ -208,10 +215,26 @@ def wildcard_substitute(name) | |||
name.gsub(/%\{[^}]+\}/, "*") | |||
end | |||
|
|||
def retry_flush | |||
unless @retry_queue.empty? | |||
buffer = @retry_queue.size.times.collect { @retry_queue.pop } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personal preference: I prefer the use of map
instead of collect simply because map
originates from functional programming languages and is widely used by most programming languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a potential race condition here. retry_flush
is called both by the flush thread at periodic interval and by the submit method. The statement
buffer = @retry_queue.size.times.collect { @retry_queue.pop }
grabs the @retry_queue
size and then iterates on that size to pop the elements. we can get into a situation where retry flush
will be executed concurrently and when that happen, both threads will get their queue size but the other thread will also pop elements so both thread will eventually block on @retry_queue.pop
because the other thread is also pop'ing elements from it.
In the if @index_type
type = event.sprintf(@index_type)
else
type = event["type"] || "logs"
end into type = @index_type ? event.sprintf(@index_type) : (event["type"] || "logs") |
failed_actions = actions.select.with_index {|_,i| [429, 503].include?(bulk_response['statuses'][i]) } | ||
unless failed_actions.empty? | ||
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending" | ||
for failed_action in failed_actions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, style-wise, I'd avoid for
loops and use iterators like each
c6b793a
to
505bd37
Compare
end | ||
|
||
[next_action, next_doc, next_event] | ||
}.compact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean having return nil
returning from the method or having nil as the map block result? I assume the latter since you call compact
at the end. In this case the return nil
will return from the function. I'd suggest rewriting this as
buffer = @retry_queue.size.times.map do
next_action, next_doc, next_event = @retry_queue.pop
next_event['@metadata']['retry_count'] += 1
if next_event['@metadata']['retry_count'] > @max_retries
@logger.error("too many attempts at sending event. dropping: #{next_event}")
next
end
[next_action, next_doc, next_event]
end.compact
next
returns from the block. you can specify a return value to next, but a bare next
is equivalent to next nil
.
noticed the use of |
Also, general comment about methods visibility: I personally prefer to group them and write the visibility modifier once and all method definitions that follow that modifier will inherit it. I think it is easier to read & understand: all public method first, then the protected and finally the private methods. No need to specify the visibility on each... |
2fb3e58
to
4c0ea0b
Compare
9bc3267
to
93b63f1
Compare
|
||
public | ||
def submit(actions) | ||
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggested comment:
# synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the
# client libraries which might not be thread safe. the submit method can be called from both the
# Stud::Buffer flush thread and from our own retry thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call
93b63f1
to
d364f87
Compare
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash]) | ||
end # def receive | ||
if next_event['@metadata']['retry_count'] > @max_retries | ||
@logger.error "too many attempts at sending event. dropping: #{next_event}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
until we have the DLQ, wondering if we should log more upon dropping the event?
#{next_event.to_hash_with_metadata.inspect}
??
Any thoughts @jordansissel ?
d364f87
to
78de271
Compare
@colinsurprenant updated again! with extra test for the |
Looks pretty good to me. We will need to followup with a code reorg/cleanup PR. |
Some actions may fail within the ES client bulk call. Now, some messages (specifically errors 429 and 503s) will be retried up to a configurable number of times (default 3). If there are still actions that are unsuccessfully indexed. These messages will be added to a separate retry queue that will have a separate thread managing its flushes. Back-pressure will be applied to prevent too many failed events from accumulating.
78de271
to
7bcc63b
Compare
thanks @colin for the thorough review |
Merged sucessfully into master! |
WOOHOOO 🎆 so glad to see this merged! 🎉 |
I just ran into the issue of dropped events because I mistakenly tried to index a document with a type starting with an underscore. This yields a |
@ralphm I will add more explicit logging to inform of this better. Currently we only retry edit: created PR and will address this today: #62 @ralphm let me know if that is helpful enough. I can update the message to be whatever you think would be useful and easy to parse for you! Thanks! |
@ralphm the logging changes have been merged into master. hope that makes things a little better until we add dead-letter queuing support! |
Some actions may fail within the ES client bulk call.
Now, some messages (specifically errors 429 and 503s) will be
retried up to 3 times. If there are still actions that are
unsuccessfully indexed, Stud Buffer will continue its current
behavior (to retry indefinitely).
Stud Buffer will replay all events it first attempted to flush
to Elasticsearch. This means duplicate events may find themselves
in Elasticsearch.
(migrated from elastic/logstash#1997 with a few modifications to make
tests run once plugin is installed)