Skip to content

Threadsafety + Exponential Backoff #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

andrewvc
Copy link
Contributor

Fixes
#377
Requires elasticsearch-ruby from this branch: elastic/elasticsearch-ruby#296

This moves the sniffer to the new one in the elasticsearch_ruby client and adds two new options for connection pooling and one for health checking. The new concept of health checks may break existing clusters where '/' is blocked for security. I'm not sure what the best approach here is since we do want a way to check if a cluster is up without interrupting regular execution flow.

The new pool_max and max_per_route options set max # of concurrent connections + the maximum concurrent connections to a given backend. The default values of 50 and 30 should be fine given that the new connection selector strategy tries to evenly spread connections across backends.

This also adds exponential backoff to connection retries with a ceiling of retry_max_interval which is the most time to wait between retries, and retry_initial_interval which is the initial amount of time to wait. retry_initial_interval will be increased exponentially between retries until a request succeeds.

@elasticsearch-release
Copy link

Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run; then say 'jenkins, test it'.

@andrewvc andrewvc changed the title DONOT MERGE Exponential threadsafe Threadsafety + Exponential Backoff Mar 29, 2016
@andrewvc andrewvc mentioned this pull request Apr 22, 2016
port = matches[3]
info.merge :host => host, :port => port, :id => id
end
end.compact
Copy link
Member

@jsvd jsvd Apr 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sniff! doesn't seem to do any side effect, or am I reading this wrong?
update_urls should be called after mapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, will fix. I'll split it into a pure and impure method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now check_sniff does the pure stuff.

def close
@state_mutex.synchronize { @stopping = true }

logger.debug "Stopping resurrectionist"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/resurrectionist/sniffer

@andrewvc andrewvc force-pushed the exponential-threadsafe branch from 2dc655d to a84e4ed Compare May 3, 2016 21:01
@andrewvc
Copy link
Contributor Author

andrewvc commented May 3, 2016

@jsvd I force pushed a mostly squashed and rebased off master version that incorporates most of your feedback. I will be added additional tests soon.

@andrewvc andrewvc force-pushed the exponential-threadsafe branch 2 times, most recently from 2256a39 to 6c70531 Compare May 6, 2016 15:51
@andrewvc
Copy link
Contributor Author

andrewvc commented May 6, 2016

Ran some performance tests. It is ~20% more efficient than the current plugin CPU wise. Doesn't make a huge impact on the wall clock but should help out some other filters!

# Old plugin. 1 worker per core
~/p/logstash ((3d875c8...)) $ curl -XDELETE 'http://127.0.0.1:9200/*'; time bin/logstash -e 'input { generator { count => 3000000 } } output { elasticsearch { workers => 8} }'
{"acknowledged":true}--- jar coordinate com.fasterxml.jackson.core:jackson-annotations already loaded with version 2.7.1 - omit version 2.7.0
--- jar coordinate com.fasterxml.jackson.core:jackson-databind already loaded with version 2.7.1 - omit version 2.7.1-1
Settings: Default pipeline workers: 8
Pipeline main started
Pipeline main has been shutdown
stopping pipeline {:id=>"main", :level=>:warn}
      135.62 real       385.20 user        27.90 sys

# New Plugin
~/p/logstash ((3d875c8...)) $ curl -XDELETE 'http://127.0.0.1:9200/*'; time bin/logstash -e 'input { generator { count => 3000000 } } output { elasticsearch { }}'
{"acknowledged":true}--- jar coordinate com.fasterxml.jackson.core:jackson-annotations already loaded with version 2.7.1 - omit version 2.7.0
--- jar coordinate com.fasterxml.jackson.core:jackson-databind already loaded with version 2.7.1 - omit version 2.7.1-1
Settings: Default pipeline workers: 8
Pipeline main started
Pipeline main has been shutdown
stopping pipeline {:id=>"main", :level=>:warn}
      125.13 real       303.91 user        25.56 sys

I also did a custom setup on the cloud benchmarker and saw these numbers on the apache logs benchmark:

# old plugin
results/benchmarks/benchmark_apachelogs_http.conf.results: 2170 events/sec . (390762 total events)
# new plugin
results/benchmarks/benchmark_apachelogs_http.conf.results: 2031 events/sec . (365737 total events)

So maybe 6% faster there. Small, but I'll take it!

@andrewvc
Copy link
Contributor Author

andrewvc commented May 6, 2016

@jsvd this is ready for another round of reviews. I squashed the last version you reviewed. All subsequent commits are new.

This now has:

  1. Additional specs
  2. Fixed a lack of 'sleep' calls in the infinite loops in pool.rb causing a big slowdown
  3. Many small bugs fixed
  4. Many tests fixed.

@andrewvc andrewvc force-pushed the exponential-threadsafe branch from 68cc454 to 55fa789 Compare May 6, 2016 21:26
@@ -1,3 +1,7 @@
## 4.0.0
- Make this plugin threadsafe. Workers no longer needed or supported
- Add pool_max and pool_max_per_route options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention the http_client here

@jsvd jsvd added P1 P2 and removed P1 labels May 17, 2016
@jsvd
Copy link
Member

jsvd commented May 18, 2016

Requires elasticsearch-ruby from this branch: elastic/elasticsearch-ruby#296
is this still true?

ES2_SNIFF_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/
# Sniffs and returns the results. Does not update internal URLs!
def check_sniff
resp = perform_request('GET', '_nodes')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the manticore_adapter perform_request accepts verbs as symbols and here we're passing a string

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, perform_request returns not the manticore response but an array of [url, resp]. therefore this line needs to be more like:

url, resp = perform_request(:get, '_nodes')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. It is now fixed :)

@jsvd
Copy link
Member

jsvd commented May 18, 2016

I haven't been able yet to make sniffing work, I added some comments about the exceptions I'm seeing and their fix, but currently at:

% bin/logstash -f ../config
--- jar coordinate com.fasterxml.jackson.core:jackson-annotations already loaded with version 2.7.1 - omit version 2.7.0
--- jar coordinate com.fasterxml.jackson.core:jackson-databind already loaded with version 2.7.1 - omit version 2.7.1-1
Settings: Default pipeline workers: 4
Pipeline main started
Error while performing sniffing {:error_message=>"Only URI objects may be passed in!", :class=>"ArgumentError", :backtrace=>["/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:217:in `normalize_url'", "org/jruby/RubyArray.java:2414:in `map'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:233:in `update_urls'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:126:in `sniff!'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:120:in `start_sniffer'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:104:in `until_stopped'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/http_client/pool.rb:120:in `start_sniffer'"], :level=>:warn}
^CSIGINT received. Shutting down the agent. {:level=>:warn}
stopping pipeline {:id=>"main", :level=>:warn}

@jsvd
Copy link
Member

jsvd commented May 18, 2016

My current manual test against a 4 node cluster (node1 to node4, minimum_master_nodes: 3) leads to data loss when:

  1. logstash is indexing 1 million events from generator w/ sniffing true
  2. cluster is green
  3. ctrl+c the node that is both the master and the one logstash was pointing to (node1)

wait for logstash to terminate and the do GET /logstash-*/_count which yielded 947879

[edit1]: second run, 975136 (instead of 1000000)

[edit2] output from logstash:

% bin/logstash -f ../config
--- jar coordinate com.fasterxml.jackson.core:jackson-annotations already loaded with version 2.7.1 - omit version 2.7.0
--- jar coordinate com.fasterxml.jackson.core:jackson-databind already loaded with version 2.7.1 - omit version 2.7.1-1
Settings: Default pipeline workers: 4
Pipeline main started
Marking url http://127.0.0.1:9200 as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] [Manticore::ClientProtocolException] 127.0.0.1:9200 failed to respond {:level=>:warn}
Attempted to send a bulk request to Elasticsearch configured at localhost, but Elasticsearch appears to be unreachable or down! {:error_message=>"[Manticore::ClientProtocolException] 127.0.0.1:9200 failed to respond", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :level=>:error}
Pipeline main has been shutdown

[edit3] This data loss does not occur if I ctrl+c another node

@jsvd
Copy link
Member

jsvd commented May 18, 2016

@andrewvc I think that's enough for a first round a review? I can redo/continue whenever you want.
By the way, I used the following patch on the code to make it work:

diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb
index e8d4495..3e744eb 100644
--- a/lib/logstash/outputs/elasticsearch/common.rb
+++ b/lib/logstash/outputs/elasticsearch/common.rb
@@ -170,11 +170,11 @@ module LogStash; module Outputs; class ElasticSearch;
         es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
         response = @client.bulk(es_actions)
         response
-      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError
+      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
         # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
         # and let the user sort it out from there
         @logger.error(
-          "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+
+          "Attempted to send a bulk request to Elasticsearch configured at localhost," + # '#{@client.client_options[:hosts]}',"+
             " but Elasticsearch appears to be unreachable or down!",
           :error_message => e.message,
           :class => e.class.name
diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
index 63542eb..b3592ed 100644
--- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
@@ -130,7 +130,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
     ES2_SNIFF_RE_URL  = /([^\/]*)?\/?([^:]*):([0-9]+)/
     # Sniffs and returns the results. Does not update internal URLs!
     def check_sniff
-      resp = perform_request('GET', '_nodes')
+      url, resp = perform_request(:get, '_nodes')
       parsed = LogStash::Json.load(resp.body)
       parsed['nodes'].map do |id,info|
         # TODO Make sure this works with shield. Does that listed
@@ -146,13 +146,13 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
         attributes = info['attributes']
         next if attributes && attributes['data'] == 'false'

-        matches = addr_str.match(ES1_RE_URL) || addr_str.match(ES2_RE_URL)
+        matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
         if matches
           host = matches[1].empty? ? matches[2] : matches[1]
           port = matches[3]
           info.merge :host => host, :port => port, :id => id
         end
-      end.compact
+      end.compact.map {|url_hash| URI.parse("http://#{url_hash['http_address']}") }
     end

     def stop_sniffer
@@ -176,7 +176,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
       @state_mutex.synchronize { @url_info.select {|url,meta| meta[:dead] } }.each do |url,meta|
         begin
           @logger.info("Checking url #{url} with path #{@healthcheck_path} to see if node resurrected")
-          perform_request_to_url(url, "HEAD", @healthcheck_path)
+          perform_request_to_url(url, :head, @healthcheck_path)
           # If no exception was raised it must have succeeded!
           logger.warn("Resurrected connection to dead ES instance at #{url}")
           @state_mutex.synchronize { meta[:dead] = false }

@andrewvc andrewvc force-pushed the exponential-threadsafe branch from c4a63f1 to 840a8d0 Compare May 20, 2016 21:41
@andrewvc
Copy link
Contributor Author

@jsvd I've pushed additional changes and fixed the sniffing!

It should be ready for another round of review. All tests passing on my end

@jsvd
Copy link
Member

jsvd commented May 23, 2016

I'm now testing with a 5 node cluster, and with a 5p/1r index I get data loss if I kill 2 nodes, making the cluster red for a while.
From what I understand, ES will fail the writes so I think LS isn't behaving correctly here.

[edit] I can't replicate this consistently, my second test resulted in no data loss

# This sets the maximum number of open connections per endpoint the output will create.
# Setting this too low may mean frequently closing / opening connections
# which is bad.
config :pool_max_per_route, :validate => :number, :default => 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the terms here can get a bit confusing if you aren't aware of the underlying httpclient library being used.
In that sense it's not obvious how a user should set these parameters for their 3, 10, 100 node cluster.

During ingestion, how many distinct endpoints (routes) are going to be hit?
a) The number of nodes (hosts)?
b) number of nodes * 2 (_bulk for ingest and _nodes for sniff)?

We should clarify here the meaning of endpoint or route (and perhaps use only 1 of the terms), and also give a few hints on how to scale this value with the cluster size?

@jsvd
Copy link
Member

jsvd commented May 23, 2016

I'm seeing duplicate messages in logstash during an error:

Settings: Default pipeline workers: 4
Pipeline main started
Marking url http://127.0.0.1:9200 as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] [Manticore::ClientProtocolException] 127.0.0.1:9200 failed to respond {:level=>:warn}
Marking url http://127.0.0.1:9200 as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] [Manticore::ClientProtocolException] 127.0.0.1:9200 failed to respond {:level=>:warn}
Encountered an unexpected error submitting a bulk request! Will retry. {:error_message=>"undefined method `client_options' for #<LogStash::Outputs::ElasticSearch::HttpClient:0x4d16931e>", :class=>"NoMethodError", :backtrace=>["/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:177:in `safe_bulk'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:172:in `safe_bulk'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:95:in `submit'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:71:in `retrying_submit'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:23:in `multi_receive'", "org/jruby/RubyArray.java:1653:in `each_slice'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:22:in `multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/output_delegator.rb:132:in `threadsafe_multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/output_delegator.rb:119:in `multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:345:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:345:in `output_batch'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:273:in `worker_loop'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:234:in `start_workers'"], :level=>:error}
Encountered an unexpected error submitting a bulk request! Will retry. {:error_message=>"undefined method `client_options' for #<LogStash::Outputs::ElasticSearch::HttpClient:0x4d16931e>", :class=>"NoMethodError", :backtrace=>["/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:177:in `safe_bulk'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:172:in `safe_bulk'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:95:in `submit'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:71:in `retrying_submit'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:23:in `multi_receive'", "org/jruby/RubyArray.java:1653:in `each_slice'", "/Users/joaoduarte/new_plugins/logstash-output-elasticsearch/lib/logstash/outputs/elasticsearch/common.rb:22:in `multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/output_delegator.rb:132:in `threadsafe_multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/output_delegator.rb:119:in `multi_receive'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:345:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:345:in `output_batch'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:273:in `worker_loop'", "/Users/joaoduarte/experiments/test_es_output/logstash/logstash-core/lib/logstash/pipeline.rb:234:in `start_workers'"], :level=>:error}

@jsvd
Copy link
Member

jsvd commented May 23, 2016

Killing a master node still causes data loss on a 3 node cluster, the same doesn't seem to happen in Logstash 2.3.1 vanilla

@jsvd
Copy link
Member

jsvd commented May 23, 2016

One thing to be aware about sniffing is that, because the initial connection doesn't resolve hosts to ips, this can happen:

Elasticsearch pool adding node @ URL http://localhost:9201 {:level=>:info}
Using mapping template from {:path=>nil, :level=>:info}
Attempting to install template {:manage_template=>{"template"=>"logstash-*", "settings"=>{"index.refresh_interval"=>"5s", "index.number_of_shards"=>3}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"string", "index"=>"not_analyzed"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"float"}, "longitude"=>{"type"=>"float"}}}}}}}, :level=>:info}
New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9201"], :level=>:info}
Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :level=>:info}
Pipeline main started
Elasticsearch pool adding node @ URL http://127.0.0.1:9202 {:level=>:info}
Elasticsearch pool adding node @ URL http://127.0.0.1:9201 {:level=>:info}
Elasticsearch pool adding node @ URL http://127.0.0.1:9200 {:level=>:info}
Elasticsearch pool removing node @ URL http://localhost:9201 {:level=>:info}

I guess we could resolve the host name at startup time and throw it out? We're already removing it from the pool during the first sniff, anyway.

@jsvd
Copy link
Member

jsvd commented May 23, 2016

Latest data loss test:

Elasticsearch 2.3.2, 3 node cluster, all on the same machine, minimum.master.nodes set to 2:

node1 - localhost:9202
node2 - localhost:9201
node3 - localhost:9200

Logstash master (current HEAD), with local logstash-output-elasticsearch git clone using andrewvc/exponential-threadsafe branch at latest HEAD
Changed template so that primary shard count is 3 instead of 5

Logstash config:

input { generator { count => 1000000 } }
output {
  elasticsearch {
    hosts => "http://localhost:9201"
    sniffing => true
  }
}
Test script
  1. Start cluster so that node2 (localhost:9201) is master. This means logstash will connect to master
  2. Launch logstash
  3. At around 250k events ingested, ctrl+c node2 and start it again
  4. Measure /logstash-*/_count after logstash terminates
  5. Use scroll request to check the absense of sequence numbers from 0 to 1 million
Results

Logstash output:

logstash (git)-[master] % bin/logstash -f ../config --log.level=verbose
--- jar coordinate com.fasterxml.jackson.core:jackson-annotations already loaded with version 2.7.1 - omit version 2.7.0
--- jar coordinate com.fasterxml.jackson.core:jackson-databind already loaded with version 2.7.1 - omit version 2.7.1-1
-------- Logstash Settings (* means modified) --------- {:level=>:info}
node.name: "Joaos-MBP.lan" {:level=>:info}
*path.config: "../config" {:level=>:info}
config.test_and_exit: false {:level=>:info}
config.reload.automatic: false {:level=>:info}
config.reload.interval: 3 {:level=>:info}
metric.collect: true {:level=>:info}
path.settings: "/Users/joaoduarte/experiments/test_es_output/logstash/config" {:level=>:info}
pipeline.id: "main" {:level=>:info}
pipeline.workers: 4 {:level=>:info}
pipeline.output.workers: 1 {:level=>:info}
pipeline.batch.size: 125 {:level=>:info}
pipeline.batch.delay: 5 {:level=>:info}
pipeline.unsafe_shutdown: false {:level=>:info}
path.plugins: [] {:level=>:info}
config.debug: false {:level=>:info}
*log.level: "verbose" (default: "warn") {:level=>:info}
version: false {:level=>:info}
help: false {:level=>:info}
log.format: "plain" {:level=>:info}
http.host: "127.0.0.1" {:level=>:info}
http.port: 9600 {:level=>:info}
--------------- Logstash Settings ------------------- {:level=>:info}
starting agent {:level=>:info}
starting pipeline {:id=>"main", :level=>:info}
Elasticsearch pool adding node @ URL http://localhost:9201 {:level=>:info}
Using mapping template from {:path=>nil, :level=>:info}
Attempting to install template {:manage_template=>{"template"=>"logstash-*", "settings"=>{"index.refresh_interval"=>"5s", "index.number_of_shards"=>3}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"string", "index"=>"not_analyzed"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"float"}, "longitude"=>{"type"=>"float"}}}}}}}, :level=>:info}
New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9201"], :level=>:info}
Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :level=>:info}
Pipeline main started
Elasticsearch pool adding node @ URL http://127.0.0.1:9202 {:level=>:info}
Elasticsearch pool adding node @ URL http://127.0.0.1:9200 {:level=>:info}
Elasticsearch pool adding node @ URL http://127.0.0.1:9201 {:level=>:info}
Elasticsearch pool removing node @ URL http://localhost:9201 {:level=>:info}
Marking url http://127.0.0.1:9201 as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] [Manticore::ClientProtocolException] 127.0.0.1:9201 failed to respond {:level=>:warn}
Attempted to send a bulk request to Elasticsearch configured at 'somewhere', but Elasticsearch appears to be unreachable or down! {:error_message=>"[Manticore::ClientProtocolException] 127.0.0.1:9201 failed to respond", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :level=>:error}
Checking url http://127.0.0.1:9201 with path / to see if node resurrected {:level=>:info}
Elasticsearch pool removing node @ URL http://127.0.0.1:9201 {:level=>:info}
Input plugins stopped! Will shutdown filter/output workers. {:level=>:info}
Pipeline main has been shutdown
stopping pipeline {:id=>"main", :level=>:warn}
Closing inputs {:level=>:info}
Closed inputs {:level=>:info}

Count after test:

% curl -XGET "localhost:9200/logstash-*/_count/?pretty"
{
  "count" : 989125,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "failed" : 0
  }
}

Results from querying elasticsearch for sequence numbers:

logstash-2.3.1 % bin/logstash -f check_bulk.config --quiet | sort -n | uniq > numbers && ruby check_numbers.rb
missing 10875!!! too many to show

@jsvd
Copy link
Member

jsvd commented May 23, 2016

I've just run the same test on logstash-2.3.1, with no data loss, full outputs in gist: https://gist.github.com/jsvd/acd7d920ddefd45d25a894f97c25b020

@jsvd
Copy link
Member

jsvd commented May 24, 2016

Results today are amazing, so far no data loss 👍

@jsvd
Copy link
Member

jsvd commented May 24, 2016

Regarding error messages, sometimes it seems either manticore or httpclient leaks through:

May 24, 2016 12:45:44 PM org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {}->http://127.0.0.1:9203: Broken pipe
May 24, 2016 12:45:44 PM org.apache.http.impl.execchain.RetryExec execute
INFO: Retrying request to {}->http://127.0.0.1:9203
Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused {:url=>#<URI::HTTP:0x6a379255 URL:http://127.0.0.1:9203>, :error_message=>"Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :level=>:warn}
Attempted to send a bulk request to elasticsearch' but Elasticsearch appears to be unreachable or down! {:error_message=>"Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :will_retry_in_seconds=>2, :level=>:error}

while other times:

Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused {:url=>#<URI::HTTP:0x45875c38 URL:http://127.0.0.1:9203>, :error_message=>"Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :level=>:warn}
Attempted to send a bulk request to elasticsearch' but Elasticsearch appears to be unreachable or down! {:error_message=>"Elasticsearch Unreachable: [http://127.0.0.1:9203][Manticore::SocketException] Connection refused", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :will_retry_in_seconds=>4, :level=>:error}

@andrewvc
Copy link
Contributor Author

@jsvd the extra log messages from Apache HC are from having manticore retry idempotent requests (that means sniffing + health checks).

I'll disable that setting because we have our own retry logic.

def build_client
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
end

def close
puts "SUBMIT COUNT: #{@submit_count} AC: #{self.client.action_count} ADAPT: #{self.client.pool.adapter.submitted_times}|#{self.client.pool.adapter.submitted_lines}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove print statement?

@andrewvc andrewvc force-pushed the exponential-threadsafe branch from 66c1be0 to d7bf4b0 Compare June 2, 2016 18:55
@andrewvc
Copy link
Contributor Author

andrewvc commented Jun 2, 2016

@talevy @jsvd I think I've addressed all your concerns. Specifically I removed ApacheHC's built-in retries. I also fixed some failing tests and fixed an issue where template installs couldn't proceed (which caused a test failure)

It could use a last pass but its probably good to merge!

@jsvd
Copy link
Member

jsvd commented Jun 3, 2016

LGTM

Major refactor for threadsafety plus better use of filehandles.

This adds new options for the new pool based connection management / resurrectionist as well.
@andrewvc andrewvc force-pushed the exponential-threadsafe branch from d7bf4b0 to 7c183f4 Compare June 3, 2016 12:29
@elasticsearch-bot
Copy link

Andrew Cholakian merged this into the following branches!

Branch Commits
master 06a4753

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants