-
Notifications
You must be signed in to change notification settings - Fork 306
Threadsafety + Exponential Backoff #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
source 'https://rubygems.org' | ||
|
||
# Specify your gem's dependencies in logstash-mass_effect.gemspec | ||
|
||
gemspec |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,8 @@ | |
# Keep in mind that a connection with keepalive enabled will | ||
# not reevaluate its DNS value while the keepalive is in effect. | ||
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base | ||
declare_threadsafe! | ||
|
||
require "logstash/outputs/elasticsearch/http_client" | ||
require "logstash/outputs/elasticsearch/http_client_builder" | ||
require "logstash/outputs/elasticsearch/common_configs" | ||
|
@@ -130,14 +132,35 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base | |
# a timeout occurs, the request will be retried. | ||
config :timeout, :validate => :number | ||
|
||
# While the output tries to reuse connections efficiently we have a maximum. | ||
# This sets the maximum number of open connections the output will create. | ||
# Setting this too low may mean frequently closing / opening connections | ||
# which is bad. | ||
config :pool_max, :validate => :number, :default => 1000 | ||
|
||
# While the output tries to reuse connections efficiently we have a maximum per endpoint. | ||
# This sets the maximum number of open connections per endpoint the output will create. | ||
# Setting this too low may mean frequently closing / opening connections | ||
# which is bad. | ||
config :pool_max_per_route, :validate => :number, :default => 100 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the terms here can get a bit confusing if you aren't aware of the underlying httpclient library being used. During ingestion, how many distinct endpoints (routes) are going to be hit? We should clarify here the meaning of endpoint or route (and perhaps use only 1 of the terms), and also give a few hints on how to scale this value with the cluster size? |
||
|
||
# When a backend is marked down a HEAD request will be sent to this path in the | ||
# background to see if it has come back again before it is once again eligible | ||
# to service requests. If you have custom firewall rules you may need to change | ||
# this | ||
config :healthcheck_path, :validate => :string, :default => "/" | ||
|
||
# How frequently, in seconds, to wait between resurrection attempts. | ||
# Resurrection is the process by which backend endpoints marked 'down' are checked | ||
# to see if they have come back to life | ||
config :resurrect_delay, :validate => :number, :default => 5 | ||
|
||
def build_client | ||
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params) | ||
end | ||
|
||
def close | ||
@stopping.make_true | ||
@client.stop_sniffing! | ||
@buffer.stop | ||
end | ||
|
||
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ } | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
require "logstash/outputs/elasticsearch/template_manager" | ||
require "logstash/outputs/elasticsearch/buffer" | ||
|
||
module LogStash; module Outputs; class ElasticSearch; | ||
module Common | ||
|
@@ -13,16 +12,11 @@ def register | |
setup_hosts # properly sets @hosts | ||
build_client | ||
install_template | ||
setup_buffer_and_handler | ||
check_action_validity | ||
|
||
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts) | ||
end | ||
|
||
def receive(event) | ||
@buffer << event_action_tuple(event) | ||
end | ||
|
||
# Receive an array of events and immediately attempt to index them (no buffering) | ||
def multi_receive(events) | ||
events.each_slice(@flush_size) do |slice| | ||
|
@@ -37,10 +31,6 @@ def event_action_tuple(event) | |
[action, params, event] | ||
end | ||
|
||
def flush | ||
@buffer.flush | ||
end | ||
|
||
def setup_hosts | ||
@hosts = Array(@hosts) | ||
if @hosts.empty? | ||
|
@@ -53,12 +43,6 @@ def install_template | |
TemplateManager.install_template(self) | ||
end | ||
|
||
def setup_buffer_and_handler | ||
@buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions| | ||
retrying_submit(actions) | ||
end | ||
end | ||
|
||
def check_action_validity | ||
raise LogStash::ConfigurationError, "No action specified!" unless @action | ||
|
||
|
@@ -75,33 +59,55 @@ def valid_actions | |
VALID_HTTP_ACTIONS | ||
end | ||
|
||
def retrying_submit(actions) | ||
def retrying_submit(actions) | ||
# Initially we submit the full list of actions | ||
submit_actions = actions | ||
|
||
sleep_interval = @retry_initial_interval | ||
|
||
while submit_actions && submit_actions.length > 0 | ||
return if !submit_actions || submit_actions.empty? # If everything's a success we move along | ||
|
||
# We retry with whatever is didn't succeed | ||
begin | ||
submit_actions = submit(submit_actions) | ||
if submit_actions && submit_actions.size > 0 | ||
@logger.error("Retrying individual actions") | ||
submit_actions.each {|action| @logger.error("Action", action) } | ||
end | ||
rescue => e | ||
@logger.warn("Encountered an unexpected error submitting a bulk request! Will retry.", | ||
:message => e.message, | ||
@logger.error("Encountered an unexpected error submitting a bulk request! Will retry.", | ||
:error_message => e.message, | ||
:class => e.class.name, | ||
:backtrace => e.backtrace) | ||
end | ||
|
||
sleep @retry_max_interval if submit_actions && submit_actions.length > 0 | ||
# Everything was a success! | ||
break if !submit_actions || submit_actions.empty? | ||
|
||
# If we're retrying the action sleep for the recommended interval | ||
# Double the interval for the next time through to achieve exponential backoff | ||
Stud.stoppable_sleep(sleep_interval) { @stopping.true? } | ||
sleep_interval = next_sleep_interval(sleep_interval) | ||
end | ||
end | ||
|
||
def submit(actions) | ||
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash]} | ||
def sleep_for_interval(sleep_interval) | ||
Stud.stoppable_sleep(sleep_interval) { @stopping.true? } | ||
next_sleep_interval(sleep_interval) | ||
end | ||
|
||
bulk_response = safe_bulk(es_actions,actions) | ||
def next_sleep_interval(current_interval) | ||
doubled = current_interval * 2 | ||
doubled > @retry_max_interval ? @retry_max_interval : doubled | ||
end | ||
|
||
# If there are no errors, we're done here! | ||
return unless bulk_response["errors"] | ||
def submit(actions) | ||
bulk_response = safe_bulk(actions) | ||
|
||
# If the response is nil that means we were in a retry loop | ||
# and aborted since we're shutting down | ||
# If it did return and there are no errors we're good as well | ||
return if bulk_response.nil? || !bulk_response["errors"] | ||
|
||
actions_to_retry = [] | ||
bulk_response["items"].each_with_index do |response,idx| | ||
|
@@ -168,38 +174,64 @@ def get_event_type(event) | |
end | ||
|
||
# Rescue retryable errors during bulk submission | ||
def safe_bulk(es_actions,actions) | ||
@client.bulk(es_actions) | ||
rescue Manticore::SocketException, Manticore::SocketTimeout => e | ||
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL) | ||
# and let the user sort it out from there | ||
@logger.error( | ||
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+ | ||
" but Elasticsearch appears to be unreachable or down!", | ||
:error_message => e.message, | ||
:class => e.class.name, | ||
:client_config => @client.client_options, | ||
) | ||
@logger.debug("Failed actions for last bad bulk request!", :actions => actions) | ||
|
||
# We retry until there are no errors! Errors should all go to the retry queue | ||
sleep @retry_max_interval | ||
retry unless @stopping.true? | ||
rescue => e | ||
# For all other errors print out full connection issues | ||
@logger.error( | ||
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," + | ||
" but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " + | ||
"the configuration provided?", | ||
:error_message => e.message, | ||
:error_class => e.class.name, | ||
:backtrace => e.backtrace, | ||
:client_config => @client.client_options, | ||
) | ||
|
||
@logger.debug("Failed actions for last bad bulk request!", :actions => actions) | ||
|
||
raise e | ||
def safe_bulk(actions) | ||
sleep_interval = @retry_initial_interval | ||
begin | ||
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we move this line outside of the begin block to avoid recomputing this on retry? I see no downside to it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wasn't me. probably @talevy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. twas me |
||
response = @client.bulk(es_actions) | ||
response | ||
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e | ||
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL) | ||
# and let the user sort it out from there | ||
@logger.error( | ||
"Attempted to send a bulk request to elasticsearch'"+ | ||
" but Elasticsearch appears to be unreachable or down!", | ||
:error_message => e.message, | ||
:class => e.class.name, | ||
:will_retry_in_seconds => sleep_interval | ||
) | ||
@logger.debug("Failed actions for last bad bulk request!", :actions => actions) | ||
|
||
# We retry until there are no errors! Errors should all go to the retry queue | ||
sleep_interval = sleep_for_interval(sleep_interval) | ||
retry unless @stopping.true? | ||
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e | ||
@logger.error( | ||
"Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?", | ||
:error_message => e.message, | ||
:class => e.class.name, | ||
:will_retry_in_seconds => sleep_interval | ||
) | ||
Stud.stoppable_sleep(sleep_interval) { @stopping.true? } | ||
sleep_interval = next_sleep_interval(sleep_interval) | ||
retry unless @stopping.true? | ||
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e | ||
if RETRYABLE_CODES.include?(e.response_code) | ||
log_hash = {:code => e.response_code, :url => e.url} | ||
log_hash[:body] = e.body if @logger.debug? # Generally this is too verbose | ||
@logger.error("Attempted to send a bulk request to elasticsearch but received a bad HTTP response code!", log_hash) | ||
|
||
sleep_interval = sleep_for_interval(sleep_interval) | ||
retry unless @stopping.true? | ||
else | ||
@logger.error("Got a bad response code from server, but this code is not considered retryable. Request will be dropped", :code => e.code) | ||
end | ||
rescue => e | ||
# Stuff that should never happen | ||
# For all other errors print out full connection issues | ||
@logger.error( | ||
"An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely", | ||
:error_message => e.message, | ||
:error_class => e.class.name, | ||
:backtrace => e.backtrace | ||
) | ||
|
||
@logger.debug("Failed actions for last bad bulk request!", :actions => actions) | ||
|
||
# We retry until there are no errors! Errors should all go to the retry queue | ||
sleep_interval = sleep_for_interval(sleep_interval) | ||
retry unless @stopping.true? | ||
end | ||
end | ||
end | ||
end; end; end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mention the http_client here