Skip to content

Commit 06a4753

Browse files
committed
Make this plugin a 'threadsafe' output (requires LS >= 2.2.0) Major refactor for threadsafety plus better use of filehandles.
This adds new options for the new pool based connection management / resurrectionist as well. Fixes #390
1 parent 28b348d commit 06a4753

30 files changed

+967
-557
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.0.0
2+
- Make this plugin threadsafe. Workers no longer needed or supported
3+
- Add pool_max and pool_max_per_route options
14
## 3.0.2
25
- Fix issues where URI based paths in 'hosts' would not function correctly
36
## 3.0.1

Gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
source 'https://rubygems.org'
22

3-
# Specify your gem's dependencies in logstash-mass_effect.gemspec
3+
44
gemspec

lib/logstash/outputs/elasticsearch.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
# Keep in mind that a connection with keepalive enabled will
4747
# not reevaluate its DNS value while the keepalive is in effect.
4848
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
49+
declare_threadsafe!
50+
4951
require "logstash/outputs/elasticsearch/http_client"
5052
require "logstash/outputs/elasticsearch/http_client_builder"
5153
require "logstash/outputs/elasticsearch/common_configs"
@@ -130,14 +132,35 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
130132
# a timeout occurs, the request will be retried.
131133
config :timeout, :validate => :number
132134

135+
# While the output tries to reuse connections efficiently we have a maximum.
136+
# This sets the maximum number of open connections the output will create.
137+
# Setting this too low may mean frequently closing / opening connections
138+
# which is bad.
139+
config :pool_max, :validate => :number, :default => 1000
140+
141+
# While the output tries to reuse connections efficiently we have a maximum per endpoint.
142+
# This sets the maximum number of open connections per endpoint the output will create.
143+
# Setting this too low may mean frequently closing / opening connections
144+
# which is bad.
145+
config :pool_max_per_route, :validate => :number, :default => 100
146+
147+
# When a backend is marked down a HEAD request will be sent to this path in the
148+
# background to see if it has come back again before it is once again eligible
149+
# to service requests. If you have custom firewall rules you may need to change
150+
# this
151+
config :healthcheck_path, :validate => :string, :default => "/"
152+
153+
# How frequently, in seconds, to wait between resurrection attempts.
154+
# Resurrection is the process by which backend endpoints marked 'down' are checked
155+
# to see if they have come back to life
156+
config :resurrect_delay, :validate => :number, :default => 5
157+
133158
def build_client
134159
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
135160
end
136161

137162
def close
138163
@stopping.make_true
139-
@client.stop_sniffing!
140-
@buffer.stop
141164
end
142165

143166
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

lib/logstash/outputs/elasticsearch/buffer.rb

Lines changed: 0 additions & 124 deletions
This file was deleted.

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 90 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
require "logstash/outputs/elasticsearch/template_manager"
2-
require "logstash/outputs/elasticsearch/buffer"
32

43
module LogStash; module Outputs; class ElasticSearch;
54
module Common
@@ -13,16 +12,11 @@ def register
1312
setup_hosts # properly sets @hosts
1413
build_client
1514
install_template
16-
setup_buffer_and_handler
1715
check_action_validity
1816

1917
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts)
2018
end
2119

22-
def receive(event)
23-
@buffer << event_action_tuple(event)
24-
end
25-
2620
# Receive an array of events and immediately attempt to index them (no buffering)
2721
def multi_receive(events)
2822
events.each_slice(@flush_size) do |slice|
@@ -37,10 +31,6 @@ def event_action_tuple(event)
3731
[action, params, event]
3832
end
3933

40-
def flush
41-
@buffer.flush
42-
end
43-
4434
def setup_hosts
4535
@hosts = Array(@hosts)
4636
if @hosts.empty?
@@ -53,12 +43,6 @@ def install_template
5343
TemplateManager.install_template(self)
5444
end
5545

56-
def setup_buffer_and_handler
57-
@buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions|
58-
retrying_submit(actions)
59-
end
60-
end
61-
6246
def check_action_validity
6347
raise LogStash::ConfigurationError, "No action specified!" unless @action
6448

@@ -75,33 +59,55 @@ def valid_actions
7559
VALID_HTTP_ACTIONS
7660
end
7761

78-
def retrying_submit(actions)
62+
def retrying_submit(actions)
7963
# Initially we submit the full list of actions
8064
submit_actions = actions
8165

66+
sleep_interval = @retry_initial_interval
67+
8268
while submit_actions && submit_actions.length > 0
83-
return if !submit_actions || submit_actions.empty? # If everything's a success we move along
69+
8470
# We retry with whatever is didn't succeed
8571
begin
8672
submit_actions = submit(submit_actions)
73+
if submit_actions && submit_actions.size > 0
74+
@logger.error("Retrying individual actions")
75+
submit_actions.each {|action| @logger.error("Action", action) }
76+
end
8777
rescue => e
88-
@logger.warn("Encountered an unexpected error submitting a bulk request! Will retry.",
89-
:message => e.message,
78+
@logger.error("Encountered an unexpected error submitting a bulk request! Will retry.",
79+
:error_message => e.message,
9080
:class => e.class.name,
9181
:backtrace => e.backtrace)
9282
end
9383

94-
sleep @retry_max_interval if submit_actions && submit_actions.length > 0
84+
# Everything was a success!
85+
break if !submit_actions || submit_actions.empty?
86+
87+
# If we're retrying the action sleep for the recommended interval
88+
# Double the interval for the next time through to achieve exponential backoff
89+
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
90+
sleep_interval = next_sleep_interval(sleep_interval)
9591
end
9692
end
9793

98-
def submit(actions)
99-
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash]}
94+
def sleep_for_interval(sleep_interval)
95+
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
96+
next_sleep_interval(sleep_interval)
97+
end
10098

101-
bulk_response = safe_bulk(es_actions,actions)
99+
def next_sleep_interval(current_interval)
100+
doubled = current_interval * 2
101+
doubled > @retry_max_interval ? @retry_max_interval : doubled
102+
end
102103

103-
# If there are no errors, we're done here!
104-
return unless bulk_response["errors"]
104+
def submit(actions)
105+
bulk_response = safe_bulk(actions)
106+
107+
# If the response is nil that means we were in a retry loop
108+
# and aborted since we're shutting down
109+
# If it did return and there are no errors we're good as well
110+
return if bulk_response.nil? || !bulk_response["errors"]
105111

106112
actions_to_retry = []
107113
bulk_response["items"].each_with_index do |response,idx|
@@ -168,38 +174,64 @@ def get_event_type(event)
168174
end
169175

170176
# Rescue retryable errors during bulk submission
171-
def safe_bulk(es_actions,actions)
172-
@client.bulk(es_actions)
173-
rescue Manticore::SocketException, Manticore::SocketTimeout => e
174-
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
175-
# and let the user sort it out from there
176-
@logger.error(
177-
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
178-
" but Elasticsearch appears to be unreachable or down!",
179-
:error_message => e.message,
180-
:class => e.class.name,
181-
:client_config => @client.client_options,
182-
)
183-
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
184-
185-
# We retry until there are no errors! Errors should all go to the retry queue
186-
sleep @retry_max_interval
187-
retry unless @stopping.true?
188-
rescue => e
189-
# For all other errors print out full connection issues
190-
@logger.error(
191-
"Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," +
192-
" but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " +
193-
"the configuration provided?",
194-
:error_message => e.message,
195-
:error_class => e.class.name,
196-
:backtrace => e.backtrace,
197-
:client_config => @client.client_options,
198-
)
199-
200-
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
201-
202-
raise e
177+
def safe_bulk(actions)
178+
sleep_interval = @retry_initial_interval
179+
begin
180+
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
181+
response = @client.bulk(es_actions)
182+
response
183+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
184+
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
185+
# and let the user sort it out from there
186+
@logger.error(
187+
"Attempted to send a bulk request to elasticsearch'"+
188+
" but Elasticsearch appears to be unreachable or down!",
189+
:error_message => e.message,
190+
:class => e.class.name,
191+
:will_retry_in_seconds => sleep_interval
192+
)
193+
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
194+
195+
# We retry until there are no errors! Errors should all go to the retry queue
196+
sleep_interval = sleep_for_interval(sleep_interval)
197+
retry unless @stopping.true?
198+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
199+
@logger.error(
200+
"Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?",
201+
:error_message => e.message,
202+
:class => e.class.name,
203+
:will_retry_in_seconds => sleep_interval
204+
)
205+
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
206+
sleep_interval = next_sleep_interval(sleep_interval)
207+
retry unless @stopping.true?
208+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
209+
if RETRYABLE_CODES.include?(e.response_code)
210+
log_hash = {:code => e.response_code, :url => e.url}
211+
log_hash[:body] = e.body if @logger.debug? # Generally this is too verbose
212+
@logger.error("Attempted to send a bulk request to elasticsearch but received a bad HTTP response code!", log_hash)
213+
214+
sleep_interval = sleep_for_interval(sleep_interval)
215+
retry unless @stopping.true?
216+
else
217+
@logger.error("Got a bad response code from server, but this code is not considered retryable. Request will be dropped", :code => e.code)
218+
end
219+
rescue => e
220+
# Stuff that should never happen
221+
# For all other errors print out full connection issues
222+
@logger.error(
223+
"An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely",
224+
:error_message => e.message,
225+
:error_class => e.class.name,
226+
:backtrace => e.backtrace
227+
)
228+
229+
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
230+
231+
# We retry until there are no errors! Errors should all go to the retry queue
232+
sleep_interval = sleep_for_interval(sleep_interval)
233+
retry unless @stopping.true?
234+
end
203235
end
204236
end
205237
end; end; end

0 commit comments

Comments
 (0)