-
Notifications
You must be signed in to change notification settings - Fork 306
Threadsafety + Exponential Backoff #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run; then say 'jenkins, test it'. |
port = matches[3] | ||
info.merge :host => host, :port => port, :id => id | ||
end | ||
end.compact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sniff! doesn't seem to do any side effect, or am I reading this wrong?
update_urls should be called after mapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, will fix. I'll split it into a pure and impure method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Now check_sniff
does the pure stuff.
def close | ||
@state_mutex.synchronize { @stopping = true } | ||
|
||
logger.debug "Stopping resurrectionist" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/resurrectionist/sniffer
2dc655d
to
a84e4ed
Compare
@jsvd I force pushed a mostly squashed and rebased off master version that incorporates most of your feedback. I will be added additional tests soon. |
2256a39
to
6c70531
Compare
Ran some performance tests. It is ~20% more efficient than the current plugin CPU wise. Doesn't make a huge impact on the wall clock but should help out some other filters!
I also did a custom setup on the cloud benchmarker and saw these numbers on the apache logs benchmark:
So maybe 6% faster there. Small, but I'll take it! |
@jsvd this is ready for another round of reviews. I squashed the last version you reviewed. All subsequent commits are new. This now has:
|
68cc454
to
55fa789
Compare
@@ -1,3 +1,7 @@ | |||
## 4.0.0 | |||
- Make this plugin threadsafe. Workers no longer needed or supported | |||
- Add pool_max and pool_max_per_route options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mention the http_client here
|
ES2_SNIFF_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/ | ||
# Sniffs and returns the results. Does not update internal URLs! | ||
def check_sniff | ||
resp = perform_request('GET', '_nodes') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the manticore_adapter perform_request accepts verbs as symbols and here we're passing a string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, perform_request returns not the manticore response but an array of [url, resp]. therefore this line needs to be more like:
url, resp = perform_request(:get, '_nodes')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. It is now fixed :)
I haven't been able yet to make sniffing work, I added some comments about the exceptions I'm seeing and their fix, but currently at:
|
My current manual test against a 4 node cluster (node1 to node4, minimum_master_nodes: 3) leads to data loss when:
wait for logstash to terminate and the do [edit1]: second run, 975136 (instead of 1000000) [edit2] output from logstash:
[edit3] This data loss does not occur if I ctrl+c another node |
@andrewvc I think that's enough for a first round a review? I can redo/continue whenever you want. diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb
index e8d4495..3e744eb 100644
--- a/lib/logstash/outputs/elasticsearch/common.rb
+++ b/lib/logstash/outputs/elasticsearch/common.rb
@@ -170,11 +170,11 @@ module LogStash; module Outputs; class ElasticSearch;
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
response = @client.bulk(es_actions)
response
- rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError
+ rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
# and let the user sort it out from there
@logger.error(
- "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
+ "Attempted to send a bulk request to Elasticsearch configured at localhost," + # '#{@client.client_options[:hosts]}',"+
" but Elasticsearch appears to be unreachable or down!",
:error_message => e.message,
:class => e.class.name
diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
index 63542eb..b3592ed 100644
--- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
@@ -130,7 +130,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
ES2_SNIFF_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/
# Sniffs and returns the results. Does not update internal URLs!
def check_sniff
- resp = perform_request('GET', '_nodes')
+ url, resp = perform_request(:get, '_nodes')
parsed = LogStash::Json.load(resp.body)
parsed['nodes'].map do |id,info|
# TODO Make sure this works with shield. Does that listed
@@ -146,13 +146,13 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
attributes = info['attributes']
next if attributes && attributes['data'] == 'false'
- matches = addr_str.match(ES1_RE_URL) || addr_str.match(ES2_RE_URL)
+ matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
port = matches[3]
info.merge :host => host, :port => port, :id => id
end
- end.compact
+ end.compact.map {|url_hash| URI.parse("http://#{url_hash['http_address']}") }
end
def stop_sniffer
@@ -176,7 +176,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:dead] } }.each do |url,meta|
begin
@logger.info("Checking url #{url} with path #{@healthcheck_path} to see if node resurrected")
- perform_request_to_url(url, "HEAD", @healthcheck_path)
+ perform_request_to_url(url, :head, @healthcheck_path)
# If no exception was raised it must have succeeded!
logger.warn("Resurrected connection to dead ES instance at #{url}")
@state_mutex.synchronize { meta[:dead] = false } |
c4a63f1
to
840a8d0
Compare
@jsvd I've pushed additional changes and fixed the sniffing! It should be ready for another round of review. All tests passing on my end |
I'm now testing with a 5 node cluster, and with a 5p/1r index I get data loss if I kill 2 nodes, making the cluster red for a while. [edit] I can't replicate this consistently, my second test resulted in no data loss |
# This sets the maximum number of open connections per endpoint the output will create. | ||
# Setting this too low may mean frequently closing / opening connections | ||
# which is bad. | ||
config :pool_max_per_route, :validate => :number, :default => 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the terms here can get a bit confusing if you aren't aware of the underlying httpclient library being used.
In that sense it's not obvious how a user should set these parameters for their 3, 10, 100 node cluster.
During ingestion, how many distinct endpoints (routes) are going to be hit?
a) The number of nodes (hosts)?
b) number of nodes * 2 (_bulk for ingest and _nodes for sniff)?
We should clarify here the meaning of endpoint or route (and perhaps use only 1 of the terms), and also give a few hints on how to scale this value with the cluster size?
I'm seeing duplicate messages in logstash during an error:
|
Killing a master node still causes data loss on a 3 node cluster, the same doesn't seem to happen in Logstash 2.3.1 vanilla |
One thing to be aware about sniffing is that, because the initial connection doesn't resolve hosts to ips, this can happen:
I guess we could resolve the host name at startup time and throw it out? We're already removing it from the pool during the first sniff, anyway. |
Latest data loss test: Elasticsearch 2.3.2, 3 node cluster, all on the same machine, minimum.master.nodes set to 2: node1 - localhost:9202 Logstash master (current HEAD), with local logstash-output-elasticsearch git clone using andrewvc/exponential-threadsafe branch at latest HEAD Logstash config:
Test script
ResultsLogstash output:
Count after test:
Results from querying elasticsearch for sequence numbers:
|
I've just run the same test on logstash-2.3.1, with no data loss, full outputs in gist: https://gist.github.com/jsvd/acd7d920ddefd45d25a894f97c25b020 |
Results today are amazing, so far no data loss 👍 |
Regarding error messages, sometimes it seems either manticore or httpclient leaks through:
while other times:
|
@jsvd the extra log messages from Apache HC are from having manticore retry idempotent requests (that means sniffing + health checks). I'll disable that setting because we have our own retry logic. |
def build_client | ||
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params) | ||
end | ||
|
||
def close | ||
puts "SUBMIT COUNT: #{@submit_count} AC: #{self.client.action_count} ADAPT: #{self.client.pool.adapter.submitted_times}|#{self.client.pool.adapter.submitted_lines}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print statement?
66c1be0
to
d7bf4b0
Compare
LGTM |
Major refactor for threadsafety plus better use of filehandles. This adds new options for the new pool based connection management / resurrectionist as well.
d7bf4b0
to
7c183f4
Compare
Andrew Cholakian merged this into the following branches!
|
Fixes
#377
Requires elasticsearch-ruby from this branch: elastic/elasticsearch-ruby#296
This moves the sniffer to the new one in the elasticsearch_ruby client and adds two new options for connection pooling and one for health checking. The new concept of health checks may break existing clusters where '/' is blocked for security. I'm not sure what the best approach here is since we do want a way to check if a cluster is up without interrupting regular execution flow.
The new
pool_max
andmax_per_route
options set max # of concurrent connections + the maximum concurrent connections to a given backend. The default values of 50 and 30 should be fine given that the new connection selector strategy tries to evenly spread connections across backends.This also adds exponential backoff to connection retries with a ceiling of
retry_max_interval
which is the most time to wait between retries, andretry_initial_interval
which is the initial amount of time to wait.retry_initial_interval
will be increased exponentially between retries until a request succeeds.