-
Notifications
You must be signed in to change notification settings - Fork 306
Update Elasticsearch Output Plugin to retry bulk #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,10 @@ | |
require "logstash/environment" | ||
require "logstash/outputs/base" | ||
require "logstash/json" | ||
require "concurrent_ruby" | ||
require "stud/buffer" | ||
require "socket" # for Socket.gethostname | ||
require "thread" # for safe queueing | ||
require "uri" # for escaping user input | ||
require 'logstash-output-elasticsearch_jars.rb' | ||
|
||
|
@@ -204,18 +206,29 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base | |
# Asks host for the list of all cluster nodes and adds them to the hosts list | ||
config :sniffing, :validate => :boolean, :default => false | ||
|
||
# helper function to replace placeholders | ||
# in index names to wildcards | ||
# example: | ||
# "logs-%{YYYY}" -> "logs-*" | ||
def wildcard_substitute(name) | ||
name.gsub(/%\{[^}]+\}/, "*") | ||
end | ||
# Set max retry for each event | ||
config :max_retries, :validate => :number, :default => 3 | ||
|
||
# Set retry policy for events that failed to send | ||
config :retry_max_items, :validate => :number, :default => 100 | ||
|
||
# Set max interval between bulk retries | ||
config :retry_max_interval, :validate => :number, :default => 5 | ||
|
||
public | ||
def register | ||
@submit_mutex = Mutex.new | ||
# retry-specific variables | ||
@retry_flush_mutex = Mutex.new | ||
@retry_teardown_requested = Concurrent::AtomicBoolean.new(false) | ||
# needs flushing when interval | ||
@retry_queue_needs_flushing = ConditionVariable.new | ||
@retry_queue_not_full = ConditionVariable.new | ||
@retry_queue = Queue.new | ||
|
||
client_settings = {} | ||
|
||
|
||
if @protocol.nil? | ||
@protocol = LogStash::Environment.jruby? ? "node" : "http" | ||
end | ||
|
@@ -288,33 +301,27 @@ def register | |
|
||
@client = Array.new | ||
|
||
if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set | ||
options = { | ||
:host => @host, | ||
:port => @port, | ||
}.merge(common_options) | ||
@client << client_class.new(options) | ||
if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set | ||
options = { :host => @host, :port => @port }.merge(common_options) | ||
@client = [client_class.new(options)] | ||
else # if @protocol in ["transport","http"] | ||
@host.each do |host| | ||
(_host,_port) = host.split ":" | ||
options = { | ||
:host => _host, | ||
:port => _port || @port, | ||
}.merge(common_options) | ||
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}" | ||
@client << client_class.new(options) | ||
end # @host.each | ||
@client = @host.map do |host| | ||
(_host,_port) = host.split ":" | ||
options = { :host => _host, :port => _port || @port }.merge(common_options) | ||
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}" | ||
client_class.new(options) | ||
end # @host.map | ||
end | ||
|
||
if @manage_template | ||
for client in @client | ||
begin | ||
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) | ||
client.template_install(@template_name, get_template, @template_overwrite) | ||
break | ||
rescue => e | ||
@logger.error("Failed to install template: #{e.message}") | ||
end | ||
begin | ||
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) | ||
client.template_install(@template_name, get_template, @template_overwrite) | ||
break | ||
rescue => e | ||
@logger.error("Failed to install template: #{e.message}") | ||
end | ||
end # for @client loop | ||
end # if @manage_templates | ||
|
||
|
@@ -330,8 +337,145 @@ def register | |
:max_interval => @idle_flush_time, | ||
:logger => @logger | ||
) | ||
|
||
@retry_timer_thread = Thread.new do | ||
loop do | ||
sleep(@retry_max_interval) | ||
@retry_queue_needs_flushing.signal | ||
end | ||
end | ||
|
||
@retry_thread = Thread.new do | ||
while @retry_teardown_requested.false? | ||
@retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } | ||
retry_flush | ||
end | ||
end | ||
end # def register | ||
|
||
|
||
public | ||
def get_template | ||
if @template.nil? | ||
@template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__)) | ||
if !File.exists?(@template) | ||
raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" | ||
end | ||
end | ||
template_json = IO.read(@template).gsub(/\n/,'') | ||
template = LogStash::Json.load(template_json) | ||
template['template'] = wildcard_substitute(@index) | ||
@logger.info("Using mapping template", :template => template) | ||
return template | ||
end # def get_template | ||
|
||
public | ||
def receive(event) | ||
return unless output?(event) | ||
|
||
# block until we have not maxed out our | ||
# retry queue. This is applying back-pressure | ||
# to slow down the receive-rate | ||
@retry_queue_not_full.wait while @retry_queue.size > @retry_max_items | ||
|
||
event['@metadata']['retry_count'] = 0 | ||
|
||
# Set the 'type' value for the index. | ||
type = @index_type ? event.sprintf(@index_type) : (event["type"] || "logs") | ||
|
||
index = event.sprintf(@index) | ||
|
||
document_id = @document_id ? event.sprintf(@document_id) : nil | ||
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event]) | ||
end # def receive | ||
|
||
public | ||
# synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the | ||
# # client libraries which might not be thread safe. the submit method can be called from both the | ||
# # Stud::Buffer flush thread and from our own retry thread. | ||
def submit(actions) | ||
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggested comment: # synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the
# client libraries which might not be thread safe. the submit method can be called from both the
# Stud::Buffer flush thread and from our own retry thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call |
||
@submit_mutex.lock | ||
begin | ||
bulk_response = @current_client.bulk(es_actions) | ||
ensure | ||
@submit_mutex.unlock | ||
end | ||
if bulk_response["errors"] | ||
failed_actions = actions.select.with_index {|_,i| [429, 503].include?(bulk_response['statuses'][i]) } | ||
unless failed_actions.empty? | ||
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending" | ||
retry_push(failed_actions) | ||
end | ||
end | ||
end | ||
|
||
# Does not raise an exception to prevent Stud::Buffer from | ||
# attempting to resubmit successful actions within the bulk | ||
# request. | ||
public | ||
def flush(actions, teardown = false) | ||
begin | ||
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}" | ||
submit(actions) | ||
rescue => e | ||
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}" | ||
ensure | ||
unless @protocol == "node" | ||
@logger.debug? and @logger.debug "Shifting current elasticsearch client" | ||
shift_client | ||
end | ||
end | ||
end # def flush | ||
|
||
public | ||
def teardown | ||
if @cacert # remove temporary jks store created from the cacert | ||
File.delete(@truststore) | ||
end | ||
|
||
@retry_teardown_requested.make_true | ||
# First, make sure retry_timer_thread is stopped | ||
# to ensure we do not signal a retry based on | ||
# the retry interval. | ||
Thread.kill(@retry_timer_thread) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should synchronize on the mutex before calling @retry_flush_mutex.synchronize {Thread.kill(@retry_timer_thread)}
@retry_timer_thread.join I think this is a pretty safe use of Thread#kill which should generally be avoided, see http://blog.headius.com/2008/02/ruby-threadraise-threadkill-timeoutrb.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
@retry_timer_thread.join | ||
# Signal flushing in the case that #retry_flush is in | ||
# the process of waiting for a signal. | ||
@retry_queue_needs_flushing.signal | ||
# Now, #retry_flush is ensured to not be in a state of | ||
# waiting and can be safely joined into the main thread | ||
# for further final execution of an in-process remaining call. | ||
@retry_thread.join | ||
|
||
# execute any final actions along with a proceeding retry for any | ||
# final actions that did not succeed. | ||
buffer_flush(:final => true) | ||
retry_flush | ||
end | ||
|
||
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ } | ||
|
||
@@plugins.each do |plugin| | ||
name = plugin.name.split('-')[-1] | ||
require "logstash/outputs/elasticsearch/#{name}" | ||
end | ||
|
||
protected | ||
def start_local_elasticsearch | ||
@logger.info("Starting embedded Elasticsearch local node.") | ||
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder | ||
# Disable 'local only' - LOGSTASH-277 | ||
#builder.local(true) | ||
builder.settings.put("cluster.name", @cluster) if @cluster | ||
builder.settings.put("node.name", @node_name) if @node_name | ||
builder.settings.put("network.host", @bind_host) if @bind_host | ||
builder.settings.put("http.port", @embedded_http_port) | ||
|
||
@embedded_elasticsearch = builder.node | ||
@embedded_elasticsearch.start | ||
end # def start_local_elasticsearch | ||
|
||
protected | ||
def shift_client | ||
@client_idx = (@client_idx+1) % @client.length | ||
|
@@ -373,36 +517,6 @@ def setup_basic_auth | |
end | ||
end | ||
|
||
public | ||
def get_template | ||
if @template.nil? | ||
@template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__)) | ||
if !File.exists?(@template) | ||
raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" | ||
end | ||
end | ||
template_json = IO.read(@template).gsub(/\n/,'') | ||
template = LogStash::Json.load(template_json) | ||
template['template'] = wildcard_substitute(@index) | ||
@logger.info("Using mapping template", :template => template) | ||
return template | ||
end # def get_template | ||
|
||
protected | ||
def start_local_elasticsearch | ||
@logger.info("Starting embedded Elasticsearch local node.") | ||
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder | ||
# Disable 'local only' - LOGSTASH-277 | ||
#builder.local(true) | ||
builder.settings.put("cluster.name", @cluster) if @cluster | ||
builder.settings.put("node.name", @node_name) if @node_name | ||
builder.settings.put("network.host", @bind_host) if @bind_host | ||
builder.settings.put("http.port", @embedded_http_port) | ||
|
||
@embedded_elasticsearch = builder.node | ||
@embedded_elasticsearch.start | ||
end # def start_local_elasticsearch | ||
|
||
private | ||
def generate_jks cert_path | ||
|
||
|
@@ -426,55 +540,44 @@ def generate_jks cert_path | |
[jks.path, pwd] | ||
end | ||
|
||
public | ||
def receive(event) | ||
return unless output?(event) | ||
|
||
# Set the 'type' value for the index. | ||
if @index_type | ||
type = event.sprintf(@index_type) | ||
else | ||
type = event["type"] || "logs" | ||
end | ||
|
||
index = event.sprintf(@index) | ||
|
||
document_id = @document_id ? event.sprintf(@document_id) : nil | ||
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash]) | ||
end # def receive | ||
|
||
def flush(actions, teardown=false) | ||
begin | ||
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}" | ||
@current_client.bulk(actions) | ||
rescue => e | ||
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}" | ||
raise e | ||
ensure | ||
unless @protocol == "node" | ||
@logger.debug? and @logger.debug "Shifting current elasticsearch client" | ||
shift_client | ||
end | ||
end | ||
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed | ||
# (aka partially fail), we need to figure out what documents need to be | ||
# retried. | ||
# | ||
# In the worst case, a failing flush (exception) will incur a retry from Stud::Buffer. | ||
end # def flush | ||
|
||
def teardown | ||
if @cacert # remove temporary jks store created from the cacert | ||
File.delete(@truststore) | ||
private | ||
# in charge of submitting any actions in @retry_queue that need to be | ||
# retried | ||
# | ||
# This method is not called concurrently. It is only called by @retry_thread | ||
# and once that thread is ended during the teardown process, a final call | ||
# to this method is done upon teardown in the main thread. | ||
def retry_flush() | ||
unless @retry_queue.empty? | ||
buffer = @retry_queue.size.times.map do | ||
next_action, next_doc, next_event = @retry_queue.pop | ||
next_event['@metadata']['retry_count'] += 1 | ||
|
||
if next_event['@metadata']['retry_count'] > @max_retries | ||
@logger.error "too many attempts at sending event. dropping: #{next_event}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. until we have the DLQ, wondering if we should log more upon dropping the event? #{next_event.to_hash_with_metadata.inspect} ?? Any thoughts @jordansissel ? |
||
nil | ||
else | ||
[next_action, next_doc, next_event] | ||
end | ||
end.compact | ||
|
||
submit(buffer) unless buffer.empty? | ||
end | ||
buffer_flush(:final => true) | ||
@retry_queue_not_full.signal if @retry_queue.size < @retry_max_items | ||
end | ||
|
||
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ } | ||
|
||
@@plugins.each do |plugin| | ||
name = plugin.name.split('-')[-1] | ||
require "logstash/outputs/elasticsearch/#{name}" | ||
private | ||
def retry_push(actions) | ||
Array(actions).each{|action| @retry_queue << action} | ||
@retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we wait until # @param actions [Array|Object] a single or a collection of actions to retry
def retry_push(actions)
Array(actions).each { |action| @retry_queue << action }
@retry_queue_needs_flushing.signal
end and the call to it would be: unless failed_actions.empty?
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
retry_push(failed_actions)
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense, but I'm afraid this can backfire. I would prefer a known retry schedule (interval / size) rather than possibly constantly failing, and going crazy with retries. I think we can leave it as is for now. and if we see practical reasons for changing it, we can go ahead and change it as you suggest |
||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest Calling unless failed_actions.empty?
@logger.debug "#{failed_actions.size}/#{actions.size} events were unsuccessful in sending"
retry_push(failed_actions)
end and def retry_push(actions)
Array(actions).each{|action| @retry_queue << action}
@retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I now agree |
||
|
||
# helper function to replace placeholders | ||
# in index names to wildcards | ||
# example: | ||
# "logs-%{YYYY}" -> "logs-*" | ||
private | ||
def wildcard_substitute(name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. woops. didn't catch this during the rebase merge |
||
name.gsub(/%\{[^}]+\}/, "*") | ||
end | ||
end # class LogStash::Outputs::Elasticsearch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move the signalling logic from the
retry_flush
method to here:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very true. didn't catch this during out refactoring