Skip to content

Update Elasticsearch Output Plugin to retry bulk #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 208 additions & 105 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
require "logstash/environment"
require "logstash/outputs/base"
require "logstash/json"
require "concurrent_ruby"
require "stud/buffer"
require "socket" # for Socket.gethostname
require "thread" # for safe queueing
require "uri" # for escaping user input
require 'logstash-output-elasticsearch_jars.rb'

Expand Down Expand Up @@ -204,18 +206,29 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Asks host for the list of all cluster nodes and adds them to the hosts list
config :sniffing, :validate => :boolean, :default => false

# helper function to replace placeholders
# in index names to wildcards
# example:
# "logs-%{YYYY}" -> "logs-*"
def wildcard_substitute(name)
name.gsub(/%\{[^}]+\}/, "*")
end
# Set max retry for each event
config :max_retries, :validate => :number, :default => 3

# Set retry policy for events that failed to send
config :retry_max_items, :validate => :number, :default => 100

# Set max interval between bulk retries
config :retry_max_interval, :validate => :number, :default => 5

public
def register
@submit_mutex = Mutex.new
# retry-specific variables
@retry_flush_mutex = Mutex.new
@retry_teardown_requested = Concurrent::AtomicBoolean.new(false)
# needs flushing when interval
@retry_queue_needs_flushing = ConditionVariable.new
@retry_queue_not_full = ConditionVariable.new
@retry_queue = Queue.new

client_settings = {}


if @protocol.nil?
@protocol = LogStash::Environment.jruby? ? "node" : "http"
end
Expand Down Expand Up @@ -288,33 +301,27 @@ def register

@client = Array.new

if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set
options = {
:host => @host,
:port => @port,
}.merge(common_options)
@client << client_class.new(options)
if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set
options = { :host => @host, :port => @port }.merge(common_options)
@client = [client_class.new(options)]
else # if @protocol in ["transport","http"]
@host.each do |host|
(_host,_port) = host.split ":"
options = {
:host => _host,
:port => _port || @port,
}.merge(common_options)
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
@client << client_class.new(options)
end # @host.each
@client = @host.map do |host|
(_host,_port) = host.split ":"
options = { :host => _host, :port => _port || @port }.merge(common_options)
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
client_class.new(options)
end # @host.map
end

if @manage_template
for client in @client
begin
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
client.template_install(@template_name, get_template, @template_overwrite)
break
rescue => e
@logger.error("Failed to install template: #{e.message}")
end
begin
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
client.template_install(@template_name, get_template, @template_overwrite)
break
rescue => e
@logger.error("Failed to install template: #{e.message}")
end
end # for @client loop
end # if @manage_templates

Expand All @@ -330,8 +337,145 @@ def register
:max_interval => @idle_flush_time,
:logger => @logger
)

@retry_timer_thread = Thread.new do
loop do
sleep(@retry_max_interval)
@retry_queue_needs_flushing.signal
end
end

@retry_thread = Thread.new do
while @retry_teardown_requested.false?
@retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) }
retry_flush
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move the signalling logic from the retry_flush method to here:

    @retry_thread = Thread.new do
      while @retry_teardown_requested.false?
        @retry_flush_mutex.synchronize{@retry_queue_needs_flushing.wait(@retry_flush_mutex)}
        retry_flush
      end
    end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very true. didn't catch this during out refactoring

end # def register


public
def get_template
if @template.nil?
@template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__))
if !File.exists?(@template)
raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')"
end
end
template_json = IO.read(@template).gsub(/\n/,'')
template = LogStash::Json.load(template_json)
template['template'] = wildcard_substitute(@index)
@logger.info("Using mapping template", :template => template)
return template
end # def get_template

public
def receive(event)
return unless output?(event)

# block until we have not maxed out our
# retry queue. This is applying back-pressure
# to slow down the receive-rate
@retry_queue_not_full.wait while @retry_queue.size > @retry_max_items

event['@metadata']['retry_count'] = 0

# Set the 'type' value for the index.
type = @index_type ? event.sprintf(@index_type) : (event["type"] || "logs")

index = event.sprintf(@index)

document_id = @document_id ? event.sprintf(@document_id) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event])
end # def receive

public
# synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the
# # client libraries which might not be thread safe. the submit method can be called from both the
# # Stud::Buffer flush thread and from our own retry thread.
def submit(actions)
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggested comment:

# synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the
# client libraries which might not be thread safe. the submit method can be called from both the
# Stud::Buffer flush thread and from our own retry thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

@submit_mutex.lock
begin
bulk_response = @current_client.bulk(es_actions)
ensure
@submit_mutex.unlock
end
if bulk_response["errors"]
failed_actions = actions.select.with_index {|_,i| [429, 503].include?(bulk_response['statuses'][i]) }
unless failed_actions.empty?
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
retry_push(failed_actions)
end
end
end

# Does not raise an exception to prevent Stud::Buffer from
# attempting to resubmit successful actions within the bulk
# request.
public
def flush(actions, teardown = false)
begin
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}"
submit(actions)
rescue => e
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}"
ensure
unless @protocol == "node"
@logger.debug? and @logger.debug "Shifting current elasticsearch client"
shift_client
end
end
end # def flush

public
def teardown
if @cacert # remove temporary jks store created from the cacert
File.delete(@truststore)
end

@retry_teardown_requested.make_true
# First, make sure retry_timer_thread is stopped
# to ensure we do not signal a retry based on
# the retry interval.
Thread.kill(@retry_timer_thread)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should synchronize on the mutex before calling .kill on the thread to avoid killing it in the middle of a flush operation and we should probably wait on the thread to finish before continuing.

@retry_flush_mutex.synchronize {Thread.kill(@retry_timer_thread)}
@retry_timer_thread.join

I think this is a pretty safe use of Thread#kill which should generally be avoided, see http://blog.headius.com/2008/02/ruby-threadraise-threadkill-timeoutrb.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@retry_timer_thread.join
# Signal flushing in the case that #retry_flush is in
# the process of waiting for a signal.
@retry_queue_needs_flushing.signal
# Now, #retry_flush is ensured to not be in a state of
# waiting and can be safely joined into the main thread
# for further final execution of an in-process remaining call.
@retry_thread.join

# execute any final actions along with a proceeding retry for any
# final actions that did not succeed.
buffer_flush(:final => true)
retry_flush
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
require "logstash/outputs/elasticsearch/#{name}"
end

protected
def start_local_elasticsearch
@logger.info("Starting embedded Elasticsearch local node.")
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
# Disable 'local only' - LOGSTASH-277
#builder.local(true)
builder.settings.put("cluster.name", @cluster) if @cluster
builder.settings.put("node.name", @node_name) if @node_name
builder.settings.put("network.host", @bind_host) if @bind_host
builder.settings.put("http.port", @embedded_http_port)

@embedded_elasticsearch = builder.node
@embedded_elasticsearch.start
end # def start_local_elasticsearch

protected
def shift_client
@client_idx = (@client_idx+1) % @client.length
Expand Down Expand Up @@ -373,36 +517,6 @@ def setup_basic_auth
end
end

public
def get_template
if @template.nil?
@template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__))
if !File.exists?(@template)
raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')"
end
end
template_json = IO.read(@template).gsub(/\n/,'')
template = LogStash::Json.load(template_json)
template['template'] = wildcard_substitute(@index)
@logger.info("Using mapping template", :template => template)
return template
end # def get_template

protected
def start_local_elasticsearch
@logger.info("Starting embedded Elasticsearch local node.")
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
# Disable 'local only' - LOGSTASH-277
#builder.local(true)
builder.settings.put("cluster.name", @cluster) if @cluster
builder.settings.put("node.name", @node_name) if @node_name
builder.settings.put("network.host", @bind_host) if @bind_host
builder.settings.put("http.port", @embedded_http_port)

@embedded_elasticsearch = builder.node
@embedded_elasticsearch.start
end # def start_local_elasticsearch

private
def generate_jks cert_path

Expand All @@ -426,55 +540,44 @@ def generate_jks cert_path
[jks.path, pwd]
end

public
def receive(event)
return unless output?(event)

# Set the 'type' value for the index.
if @index_type
type = event.sprintf(@index_type)
else
type = event["type"] || "logs"
end

index = event.sprintf(@index)

document_id = @document_id ? event.sprintf(@document_id) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash])
end # def receive

def flush(actions, teardown=false)
begin
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}"
@current_client.bulk(actions)
rescue => e
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}"
raise e
ensure
unless @protocol == "node"
@logger.debug? and @logger.debug "Shifting current elasticsearch client"
shift_client
end
end
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
# (aka partially fail), we need to figure out what documents need to be
# retried.
#
# In the worst case, a failing flush (exception) will incur a retry from Stud::Buffer.
end # def flush

def teardown
if @cacert # remove temporary jks store created from the cacert
File.delete(@truststore)
private
# in charge of submitting any actions in @retry_queue that need to be
# retried
#
# This method is not called concurrently. It is only called by @retry_thread
# and once that thread is ended during the teardown process, a final call
# to this method is done upon teardown in the main thread.
def retry_flush()
unless @retry_queue.empty?
buffer = @retry_queue.size.times.map do
next_action, next_doc, next_event = @retry_queue.pop
next_event['@metadata']['retry_count'] += 1

if next_event['@metadata']['retry_count'] > @max_retries
@logger.error "too many attempts at sending event. dropping: #{next_event}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until we have the DLQ, wondering if we should log more upon dropping the event?

#{next_event.to_hash_with_metadata.inspect}

??

Any thoughts @jordansissel ?

nil
else
[next_action, next_doc, next_event]
end
end.compact

submit(buffer) unless buffer.empty?
end
buffer_flush(:final => true)
@retry_queue_not_full.signal if @retry_queue.size < @retry_max_items
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
require "logstash/outputs/elasticsearch/#{name}"
private
def retry_push(actions)
Array(actions).each{|action| @retry_queue << action}
@retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we wait until @retry_queue.size >= @retry_max_items to signal a flush? why don't we signal it right away as soon as we pushed the failures? maybe refactor like this?

  # @param actions [Array|Object] a single or a collection of actions to retry
  def retry_push(actions)
    Array(actions).each { |action| @retry_queue << action }
    @retry_queue_needs_flushing.signal
  end

and the call to it would be:

      unless failed_actions.empty?
        @logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
        retry_push(failed_actions)
      end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, but I'm afraid this can backfire. I would prefer a known retry schedule (interval / size) rather than possibly constantly failing, and going crazy with retries. I think we can leave it as is for now. and if we see practical reasons for changing it, we can go ahead and change it as you suggest

end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest retry_push should also support a list of actions parameter to avoid calling .signal on all actions once @retry_queue.size >= @retry_max_items.

Calling retry_flush could now be

      unless failed_actions.empty?
        @logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
        retry_push(failed_actions)
      end

and retry_flush could handle single or multiple actions with something like:

  def retry_push(actions)
    Array(actions).each{|action| @retry_queue << action}
    @retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items
  end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now agree


# helper function to replace placeholders
# in index names to wildcards
# example:
# "logs-%{YYYY}" -> "logs-*"
private
def wildcard_substitute(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the wildcard_substitute method is declared twice, once at the top as a public method and also here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops. didn't catch this during the rebase merge

name.gsub(/%\{[^}]+\}/, "*")
end
end # class LogStash::Outputs::Elasticsearch
Loading