Skip to content

Update Elasticsearch Output Plugin to retry bulk #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

talevy
Copy link
Contributor

@talevy talevy commented Oct 31, 2014

Some actions may fail within the ES client bulk call.
Now, some messages (specifically errors 429 and 503s) will be
retried up to 3 times. If there are still actions that are
unsuccessfully indexed, Stud Buffer will continue its current
behavior (to retry indefinitely).

Stud Buffer will replay all events it first attempted to flush
to Elasticsearch. This means duplicate events may find themselves
in Elasticsearch.

(migrated from elastic/logstash#1997 with a few modifications to make
tests run once plugin is installed)

Stud::try(3.times) do
bulk_response = @current_client.bulk(actions)
if bulk_response["errors"]
failed_actions = bulk_response['statuses'].map.with_index {|x, i| actions[i] unless [200, 201].include?(x)}.compact
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reject might be simpler:

failed_actions = actions.reject.with_index {|_,i| [200, 201].include? bulk_response['statuses'][i] }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about reject. I like it!

@jordansissel
Copy link
Contributor

Let's only retry the documents which failed :)

@jordansissel
Copy link
Contributor

If we have to improve Stud::Buffer, we can do that, but we should only retry docs that failed.

@talevy
Copy link
Contributor Author

talevy commented Nov 4, 2014

@jordansissel do you see a way of accomplishing that without updating Stud::Buffer to
handle the partials here? https://github.com/jordansissel/ruby-stud/blob/master/lib/stud/buffer.rb#L217-L227

first way I see:

  • update flush interface to return status
  • use that status to delete from @buffer_state[:outgoing_items]/@buffer_state[:pending_items] and
    @buffer_state[:outgoing_count]/@buffer_state[:pending_count] accordingly.

what do you think?

@talevy
Copy link
Contributor Author

talevy commented Nov 12, 2014

Update from discussions:

We will take over the whole retry management as to prevent
Stud::Buffer from retrying all queued up events (including successful ones)

@talevy talevy force-pushed the fix/1631-retry branch 4 times, most recently from bd290de to 3684a81 Compare November 17, 2014 21:43
@colinsurprenant
Copy link
Contributor

I will make a few inline comments, but I'd like to discuss the submit + retry_flush design where retry_flush can call submit... this sequence is hard to reason about: submit -> retry_flush -> submit -> ...

@@ -208,10 +215,26 @@ def wildcard_substitute(name)
name.gsub(/%\{[^}]+\}/, "*")
end

def retry_flush
unless @retry_queue.empty?
buffer = @retry_queue.size.times.collect { @retry_queue.pop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: I prefer the use of map instead of collect simply because map originates from functional programming languages and is widely used by most programming languages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a potential race condition here. retry_flush is called both by the flush thread at periodic interval and by the submit method. The statement

buffer = @retry_queue.size.times.collect { @retry_queue.pop }

grabs the @retry_queue size and then iterates on that size to pop the elements. we can get into a situation where retry flush will be executed concurrently and when that happen, both threads will get their queue size but the other thread will also pop elements so both thread will eventually block on @retry_queue.pop because the other thread is also pop'ing elements from it.

@colinsurprenant
Copy link
Contributor

In the receive method, maybe rewrite

    if @index_type
      type = event.sprintf(@index_type)
    else
      type = event["type"] || "logs"
    end

into

  type = @index_type ? event.sprintf(@index_type) : (event["type"] || "logs")

failed_actions = actions.select.with_index {|_,i| [429, 503].include?(bulk_response['statuses'][i]) }
unless failed_actions.empty?
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
for failed_action in failed_actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, style-wise, I'd avoid for loops and use iterators like each

@talevy talevy force-pushed the fix/1631-retry branch 2 times, most recently from c6b793a to 505bd37 Compare December 1, 2014 15:26
end

[next_action, next_doc, next_event]
}.compact
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean having return nil returning from the method or having nil as the map block result? I assume the latter since you call compact at the end. In this case the return nil will return from the function. I'd suggest rewriting this as

buffer = @retry_queue.size.times.map do
  next_action, next_doc, next_event = @retry_queue.pop
  next_event['@metadata']['retry_count'] += 1

  if next_event['@metadata']['retry_count'] > @max_retries
    @logger.error("too many attempts at sending event. dropping: #{next_event}")
    next
  end

  [next_action, next_doc, next_event]
end.compact

next returns from the block. you can specify a return value to next, but a bare next is equivalent to next nil.

@colinsurprenant
Copy link
Contributor

noticed the use of or in the context of boolean logical operator at https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/2/files#diff-e1eda60fc5950f2a16e1209545609fdfL286 as discussed we should use ||

@colinsurprenant
Copy link
Contributor

Also, general comment about methods visibility: I personally prefer to group them and write the visibility modifier once and all method definitions that follow that modifier will inherit it. I think it is easier to read & understand: all public method first, then the protected and finally the private methods. No need to specify the visibility on each...

@talevy talevy force-pushed the fix/1631-retry branch 2 times, most recently from 2fb3e58 to 4c0ea0b Compare December 4, 2014 00:03
@talevy talevy force-pushed the fix/1631-retry branch 2 times, most recently from 9bc3267 to 93b63f1 Compare January 12, 2015 23:08

public
def submit(actions)
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggested comment:

# synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the
# client libraries which might not be thread safe. the submit method can be called from both the
# Stud::Buffer flush thread and from our own retry thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash])
end # def receive
if next_event['@metadata']['retry_count'] > @max_retries
@logger.error "too many attempts at sending event. dropping: #{next_event}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until we have the DLQ, wondering if we should log more upon dropping the event?

#{next_event.to_hash_with_metadata.inspect}

??

Any thoughts @jordansissel ?

@talevy
Copy link
Contributor Author

talevy commented Jan 13, 2015

@colinsurprenant updated again! with extra test for the max_retries logic (which was not being tested, as the return nil went by unnoticed)

@colinsurprenant
Copy link
Contributor

Looks pretty good to me. We will need to followup with a code reorg/cleanup PR.
I did not review the tests and I did not stress test or do integration test on this. I think we should. For http we could simply create a dummy http server to mock an ES instance.

Some actions may fail within the ES client bulk call.
Now, some messages (specifically errors 429 and 503s) will be
retried up to a configurable number of times (default 3).
If there are still actions that are unsuccessfully indexed. These
messages will be added to a separate retry queue that will have a
separate thread managing its flushes. Back-pressure will be applied
to prevent too many failed events from accumulating.
@talevy
Copy link
Contributor Author

talevy commented Jan 13, 2015

thanks @colin for the thorough review

@elasticsearch-bot
Copy link

Merged sucessfully into master!

@jsvd
Copy link
Member

jsvd commented Jan 14, 2015

WOOHOOO 🎆 so glad to see this merged! 🎉

@ralphm
Copy link

ralphm commented Feb 23, 2015

I just ran into the issue of dropped events because I mistakenly tried to index a document with a type starting with an underscore. This yields a 400 error, but the drop happens silently and I only found out what happened by tcpdump. Although I see elastic/logstash#1631 is closed because of this ticket, I couldn't find the follow-up PR that @colinsurprenant hinted at?

@talevy
Copy link
Contributor Author

talevy commented Feb 23, 2015

@ralphm I will add more explicit logging to inform of this better. Currently we only retry 429 and 503 errors. Others will be logged and dropped until we have a persistent way of keeping track of these without halting the pipeline. Let me know if this would help, or maybe another behavior you would expect.

edit: created PR and will address this today: #62 @ralphm let me know if that is helpful enough. I can update the message to be whatever you think would be useful and easy to parse for you! Thanks!

@talevy
Copy link
Contributor Author

talevy commented Mar 2, 2015

@ralphm the logging changes have been merged into master. hope that makes things a little better until we add dead-letter queuing support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants