Skip to content

Threadsafety + Exponential Backoff #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.0.0
- Make this plugin threadsafe. Workers no longer needed or supported
- Add pool_max and pool_max_per_route options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention the http_client here

## 3.0.2
- Fix issues where URI based paths in 'hosts' would not function correctly
## 3.0.1
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
source 'https://rubygems.org'

# Specify your gem's dependencies in logstash-mass_effect.gemspec

gemspec
27 changes: 25 additions & 2 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
# Keep in mind that a connection with keepalive enabled will
# not reevaluate its DNS value while the keepalive is in effect.
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
declare_threadsafe!

require "logstash/outputs/elasticsearch/http_client"
require "logstash/outputs/elasticsearch/http_client_builder"
require "logstash/outputs/elasticsearch/common_configs"
Expand Down Expand Up @@ -130,14 +132,35 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# a timeout occurs, the request will be retried.
config :timeout, :validate => :number

# While the output tries to reuse connections efficiently we have a maximum.
# This sets the maximum number of open connections the output will create.
# Setting this too low may mean frequently closing / opening connections
# which is bad.
config :pool_max, :validate => :number, :default => 1000

# While the output tries to reuse connections efficiently we have a maximum per endpoint.
# This sets the maximum number of open connections per endpoint the output will create.
# Setting this too low may mean frequently closing / opening connections
# which is bad.
config :pool_max_per_route, :validate => :number, :default => 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the terms here can get a bit confusing if you aren't aware of the underlying httpclient library being used.
In that sense it's not obvious how a user should set these parameters for their 3, 10, 100 node cluster.

During ingestion, how many distinct endpoints (routes) are going to be hit?
a) The number of nodes (hosts)?
b) number of nodes * 2 (_bulk for ingest and _nodes for sniff)?

We should clarify here the meaning of endpoint or route (and perhaps use only 1 of the terms), and also give a few hints on how to scale this value with the cluster size?


# When a backend is marked down a HEAD request will be sent to this path in the
# background to see if it has come back again before it is once again eligible
# to service requests. If you have custom firewall rules you may need to change
# this
config :healthcheck_path, :validate => :string, :default => "/"

# How frequently, in seconds, to wait between resurrection attempts.
# Resurrection is the process by which backend endpoints marked 'down' are checked
# to see if they have come back to life
config :resurrect_delay, :validate => :number, :default => 5

def build_client
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
end

def close
@stopping.make_true
@client.stop_sniffing!
@buffer.stop
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
Expand Down
124 changes: 0 additions & 124 deletions lib/logstash/outputs/elasticsearch/buffer.rb

This file was deleted.

148 changes: 90 additions & 58 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
require "logstash/outputs/elasticsearch/template_manager"
require "logstash/outputs/elasticsearch/buffer"

module LogStash; module Outputs; class ElasticSearch;
module Common
Expand All @@ -13,16 +12,11 @@ def register
setup_hosts # properly sets @hosts
build_client
install_template
setup_buffer_and_handler
check_action_validity

@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts)
end

def receive(event)
@buffer << event_action_tuple(event)
end

# Receive an array of events and immediately attempt to index them (no buffering)
def multi_receive(events)
events.each_slice(@flush_size) do |slice|
Expand All @@ -37,10 +31,6 @@ def event_action_tuple(event)
[action, params, event]
end

def flush
@buffer.flush
end

def setup_hosts
@hosts = Array(@hosts)
if @hosts.empty?
Expand All @@ -53,12 +43,6 @@ def install_template
TemplateManager.install_template(self)
end

def setup_buffer_and_handler
@buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions|
retrying_submit(actions)
end
end

def check_action_validity
raise LogStash::ConfigurationError, "No action specified!" unless @action

Expand All @@ -75,33 +59,55 @@ def valid_actions
VALID_HTTP_ACTIONS
end

def retrying_submit(actions)
def retrying_submit(actions)
# Initially we submit the full list of actions
submit_actions = actions

sleep_interval = @retry_initial_interval

while submit_actions && submit_actions.length > 0
return if !submit_actions || submit_actions.empty? # If everything's a success we move along

# We retry with whatever is didn't succeed
begin
submit_actions = submit(submit_actions)
if submit_actions && submit_actions.size > 0
@logger.error("Retrying individual actions")
submit_actions.each {|action| @logger.error("Action", action) }
end
rescue => e
@logger.warn("Encountered an unexpected error submitting a bulk request! Will retry.",
:message => e.message,
@logger.error("Encountered an unexpected error submitting a bulk request! Will retry.",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace)
end

sleep @retry_max_interval if submit_actions && submit_actions.length > 0
# Everything was a success!
break if !submit_actions || submit_actions.empty?

# If we're retrying the action sleep for the recommended interval
# Double the interval for the next time through to achieve exponential backoff
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
end
end

def submit(actions)
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash]}
def sleep_for_interval(sleep_interval)
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
next_sleep_interval(sleep_interval)
end

bulk_response = safe_bulk(es_actions,actions)
def next_sleep_interval(current_interval)
doubled = current_interval * 2
doubled > @retry_max_interval ? @retry_max_interval : doubled
end

# If there are no errors, we're done here!
return unless bulk_response["errors"]
def submit(actions)
bulk_response = safe_bulk(actions)

# If the response is nil that means we were in a retry loop
# and aborted since we're shutting down
# If it did return and there are no errors we're good as well
return if bulk_response.nil? || !bulk_response["errors"]

actions_to_retry = []
bulk_response["items"].each_with_index do |response,idx|
Expand Down Expand Up @@ -168,38 +174,64 @@ def get_event_type(event)
end

# Rescue retryable errors during bulk submission
def safe_bulk(es_actions,actions)
@client.bulk(es_actions)
rescue Manticore::SocketException, Manticore::SocketTimeout => e
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
# and let the user sort it out from there
@logger.error(
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
" but Elasticsearch appears to be unreachable or down!",
:error_message => e.message,
:class => e.class.name,
:client_config => @client.client_options,
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)

# We retry until there are no errors! Errors should all go to the retry queue
sleep @retry_max_interval
retry unless @stopping.true?
rescue => e
# For all other errors print out full connection issues
@logger.error(
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," +
" but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " +
"the configuration provided?",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace,
:client_config => @client.client_options,
)

@logger.debug("Failed actions for last bad bulk request!", :actions => actions)

raise e
def safe_bulk(actions)
sleep_interval = @retry_initial_interval
begin
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this line outside of the begin block to avoid recomputing this on retry? I see no downside to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd the overhead is minimal and it really makes understanding the flow much simpler. Before it was always confusing (actions + es_actions). I think @suyograo requested this change originally? Or was it @talevy ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't me. probably @talevy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

twas me

response = @client.bulk(es_actions)
response
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
# and let the user sort it out from there
@logger.error(
"Attempted to send a bulk request to elasticsearch'"+
" but Elasticsearch appears to be unreachable or down!",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)

# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
@logger.error(
"Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if RETRYABLE_CODES.include?(e.response_code)
log_hash = {:code => e.response_code, :url => e.url}
log_hash[:body] = e.body if @logger.debug? # Generally this is too verbose
@logger.error("Attempted to send a bulk request to elasticsearch but received a bad HTTP response code!", log_hash)

sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
else
@logger.error("Got a bad response code from server, but this code is not considered retryable. Request will be dropped", :code => e.code)
end
rescue => e
# Stuff that should never happen
# For all other errors print out full connection issues
@logger.error(
"An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace
)

@logger.debug("Failed actions for last bad bulk request!", :actions => actions)

# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
end
end
end
end; end; end
Loading