Skip to content

Threadsafe Manticore #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@ sudo: false

language: ruby

cache: bundler

branches:
only:
- master
- travis

rvm:
- 1.8.7
- 1.9.3
- 2.1
- 2.2
- jruby-1.7.23
# - 1.8.7
# - 1.9.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are Ruby 1.8 compatible in the low-level client, what's the reason for removing 1.8.7 from the Travis configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought these were failing on Master. I see the unit tests are passing here: https://travis-ci.org/elastic/elasticsearch-ruby/builds/120013643 . I must have been mistaken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests might have failed, but I'd like to keep 1.8 compatibility for the low level gems (transport+api) as long as technically possible. (The Rails integration and the elasticsearch-dsl gem are already 1.9+ only.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruby 1.8.7-p370 was released Jun 29. 2012, almost four years ago at this point.

Are people still using it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd be amazed :D

# - 2.1
# - 2.2

jdk:
- openjdk7

env:
- TEST_SUITE=unit
Expand All @@ -39,8 +36,7 @@ before_script:
- gem install bundler -v 1.11.2
- rake setup
- rake elasticsearch:update
- rake bundle:clean
- rake bundle:install
- echo elasticsearch-* elasticsearch | xargs -n1 -IFF bash -c 'echo processing bundle for FF && cd FF && bundle && cd ..'

script:
- SERVER=start TEST_CLUSTER_LOGS=/tmp/log TEST_BUILD_REF=tags/v$ES_VERSION TEST_CLUSTER_COMMAND=/tmp/elasticsearch-$ES_VERSION/bin/elasticsearch rake test:$TEST_SUITE
Expand Down
18 changes: 4 additions & 14 deletions elasticsearch-extensions/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,8 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in elasticsearch-extensions.gemspec
gemspec

if File.exists? File.expand_path("../../elasticsearch-api/elasticsearch-api.gemspec", __FILE__)
gem 'elasticsearch-api', :path => File.expand_path("../../elasticsearch-api", __FILE__), :require => false
end
gem 'elasticsearch-api', :path => "../elasticsearch-api", :require => false
gem 'elasticsearch-transport', :path => "../elasticsearch-transport", :require => false
gem 'elasticsearch-extensions', :path => "../elasticsearch-extensions", :require => false

if File.exists? File.expand_path("../../elasticsearch-transport/elasticsearch-transport.gemspec", __FILE__)
gem 'elasticsearch-transport', :path => File.expand_path("../../elasticsearch-transport", __FILE__), :require => false
end

if File.exists? File.expand_path("../../elasticsearch-extensions", __FILE__)
gem 'elasticsearch-extensions', :path => File.expand_path("../../elasticsearch-extensions", __FILE__), :require => false
end

if File.exists? File.expand_path("../../elasticsearch/elasticsearch.gemspec", __FILE__)
gem 'elasticsearch', :path => File.expand_path("../../elasticsearch/", __FILE__)
end
gem 'elasticsearch', :path => "../elasticsearch/"
4 changes: 2 additions & 2 deletions elasticsearch-extensions/elasticsearch-extensions.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ Gem::Specification.new do |s|
end

# Gems for testing integrations
s.add_development_dependency "patron"
s.add_development_dependency "oj"
s.add_development_dependency "patron" unless defined?(JRUBY_VERSION) || defined?(Rubinius)
s.add_development_dependency "oj" unless defined?(JRUBY_VERSION) || defined?(Rubinius)
end
5 changes: 5 additions & 0 deletions elasticsearch-transport/lib/elasticsearch/transport/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def perform_request(method, path, params={}, body=nil)
transport.perform_request method, path, params, body
end

# Shuts down the client
def close
@transport.__close_connections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a public api for the transport, it seems wrong to be calling an method prefixed by __

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that wrong? Isn't that the point of private __ methods? Shouldn't they be wrapped by more correct public ones.

I think his is a pretty sane default impl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree __ should be for private methods, but here we're calling the private method from the outside of the @transport, right?

end

# Normalizes and returns hosts configuration.
#
# Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def initialize(arguments={}, &block)
@logger = options[:logger]
@tracer = options[:tracer]

@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
sniffer_class = options.fetch(:sniffer_class, Sniffer)
@sniffer = sniffer_class.new(self, @logger)
@counter = 0
@counter_mtx = Mutex.new
@last_request_at = Time.now
Expand Down Expand Up @@ -165,8 +166,10 @@ def __close_connections
#
def __log(method, path, params, body, url, response, json, took, duration)
sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')

logger.info "#{method.to_s.upcase} #{sanitized_url} " +
"[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"

logger.debug "> #{__convert_to_json(body)}" if body
logger.debug "< #{response.body}"
end
Expand Down Expand Up @@ -197,8 +200,8 @@ def __trace(method, path, params, body, url, response, json, took, duration)
# @api private
#
def __raise_transport_error(response)
error = ERRORS[response.status] || ServerError
raise error.new "[#{response.status}] #{response.body}"
error_class = ERRORS.fetch(response.status, ServerError)
raise error_class.new(response), "[#{response.status}] #{response.body}"
end

# Converts any non-String object to JSON
Expand Down Expand Up @@ -260,8 +263,7 @@ def perform_request(method, path, params={}, body=nil, &block)

# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

rescue Elasticsearch::Transport::Transport::ServerError => e
rescue ::Elasticsearch::Transport::Transport::ServerError => e
if @retry_on_status.include?(response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
Expand Down Expand Up @@ -323,6 +325,11 @@ def perform_request(method, path, params={}, body=nil, &block)
@last_request_at = Time.now
end

def __deserialize_response(response)
serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/

end

# @abstract Returns an Array of connection errors specific to the transport implementation.
# See {HTTP::Faraday#host_unreachable_exceptions} for an example.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def resurrect!
#
def resurrectable?
@state_mutex.synchronize {
Time.now > @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) )
Time.now >= @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an off by 1 error in the tests that was previously undiscovered for reasons I don't know. >= makes more sense logically considering we're using the word since anyway.

}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,26 @@ class SnifferTimeoutError < Timeout::Error; end

# Elasticsearch server error (HTTP status 5xx)
#
class ServerError < Error; end
class ServerError < Error;
attr_reader :response

def initialize(response)
@response = response
end
end

class HostUnreachableError < Error;
attr_reader :original_error, :url

def initialize(original_error, url)
@original_error = original_error
@url = url
end

def message
"[#{original_error.class}] #{original_error.message}"
end
end

module Errors; end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
require 'manticore'
require "elasticsearch/transport/transport/http/manticore/pool"
require "elasticsearch/transport/transport/http/manticore/adapter"
require "elasticsearch/transport/transport/http/manticore/manticore_sniffer"

module Elasticsearch
module Transport
Expand Down Expand Up @@ -43,97 +46,138 @@ module HTTP
# @see Transport::Base
#
class Manticore
attr_reader :pool, :adapter, :options
include Base

def initialize(arguments={}, &block)
@manticore = build_client(arguments[:options] || {})
super(arguments, &block)
@options = arguments[:options] || {}
@options[:http] ||= {}
@logger = options[:logger]
@adapter = Adapter.new(logger, options)
@healthcheck_path = options[:healthcheck_path] || "/"
@pool = Manticore::Pool.new(logger, @adapter, @healthcheck_path, (arguments[:hosts] || []), 5, self.host_unreachable_exceptions, options)
@protocol = options[:protocol] || DEFAULT_PROTOCOL
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
@max_retries = options[:retry_on_failure].is_a?(Fixnum) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe retriable_statuses instead? imo retry_on_status seems to be a predicate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but retry_on_status is the existing API in base.rb. Not worth changing IMHO.


setup_sniffing!
end

# Should just be run once at startup
def build_client(options={})
client_options = options[:transport_options] || {}
client_options[:ssl] = options[:ssl] || {}
def setup_sniffing!
if options[:sniffing] || options[:reload_connections]
# We don't support sniffers that aren't threadsafe with timers here!
sniffer_class = options[:sniffer_class] ? options[:sniffer_class] : ::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer
raise ArgumentError, "Sniffer class #{sniffer_class} must be a ManticoreSniffer!" if sniffer_class.nil? || !sniffer_class.ancestors.include?(::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer)
@sniffer = sniffer_class.new(self, logger)
@sniffer.sniff_every(options[:sniffer_delay] || 5) do |urls|
logger.info("Will update internal host pool with #{urls.inspect}")
@pool.update_urls(urls)
end
end
end

@manticore = ::Manticore::Client.new(client_options)
# Sniff (if enabled) to get the newest list of hosts
# then attempt to resurrect any dead URLs
def reload_connections!
if options[:sniffing]
@pool.update_urls(@sniffer.hosts)
end
@pool.resurrect_dead!
end

# Performs the request by invoking {Transport::Base#perform_request} with a block.
#
# @return [Response]
# @see Transport::Base#perform_request
#
def perform_request(method, path, params={}, body=nil)
super do |connection, url|
params[:body] = __convert_to_json(body) if body
params = params.merge @request_options
case method
when "GET"
resp = connection.connection.get(url, params)
when "HEAD"
resp = connection.connection.head(url, params)
when "PUT"
resp = connection.connection.put(url, params)
when "POST"
resp = connection.connection.post(url, params)
when "DELETE"
resp = connection.connection.delete(url, params)
body = __convert_to_json(body) if body
url, response = with_request_retries do
url, response = @pool.perform_request(method, path, params, body)
# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
[url, response]
end

enrich_response(method, url, path, params, body, response)
end

# This takes a host string to aid in debug logging
def with_request_retries
tries = 0
begin
tries += 1
yield
rescue ::Elasticsearch::Transport::Transport::ServerError => e
if @retry_on_status.include?(e.response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if logger exists and level == debug you get the same message twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just copied from base.rb Would rather not address in this PR

logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
if tries <= max_retries
retry
else
logger.error "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
raise e
end
else
raise ArgumentError.new "Method #{method} not supported"
raise e
end
Response.new resp.code, resp.read_body, resp.headers
rescue ::Elasticsearch::Transport::Transport::HostUnreachableError => e
logger.error "[#{e.class}] #{e.message} #{e.url}" if logger

if @options[:retry_on_failure]
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
if tries <= max_retries
if @options[:reload_on_failure] && pool.alive_urls_count == 0
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{max_retries})" if logger
reload_connections!
end

retry
else
logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
raise e
end
end
rescue Exception => e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just rescue => e (StandardError) instead? if the exception actually falls off of StandardError then we should likely let that bubble up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We re-raise it anyway. I'm always worried about other exceptions popping up on JRuby. WDYT?

logger.fatal "[#{e.class}] #{e.message} ()" if logger
raise e
end
end

# Builds and returns a collection of connections.
# Each connection is a Manticore::Client
#
# @return [Connections::Collection]
#
def __build_connections
@request_options = {}

if options.key?(:headers)
@request_options[:headers] = options[:headers]
def __close_connections
if @sniffer
logger.info("Closing sniffer...") if logger
@sniffer.close
end
logger.info("Sniffer closed.") if logger
logger.info("Closing pool") if logger
@pool.close # closes adapter as well
logger.info("Pool closed") if logger
end

Connections::Collection.new \
:connections => hosts.map { |host|
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
host[:port] ||= DEFAULT_PORT
def enrich_response(method, url, path, params, body, response)
start = Time.now if logger || tracer

host.delete(:user) # auth is not supported here.
host.delete(:password) # use the headers
duration = Time.now-start if logger || tracer

Connections::Connection.new \
:host => host,
:connection => @manticore
},
:selector_class => options[:selector_class],
:selector => options[:selector]
end
if response.status.to_i >= 300
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__log method, path, params, body, url, response, nil, 'N/A', duration if logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an excess of error messages if a request fails permanently: it will log 1 message for each failed try, and then log here again, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just copied from base. I'd prefer to just leave this as is and address that in another PR TBH. Lots going on here already.

__trace method, path, params, body, url, response, nil, 'N/A', duration if tracer
__log_failed response if logger
__raise_transport_error response
end

# Closes all connections by marking them as dead
# and closing the underlying HttpClient instances
#
# @return [Connections::Collection]
#
def __close_connections
# The Manticore adapter uses a single long-lived instance
# of Manticore::Client, so we don't close the connections.
json = __deserialize_response(response)
if json
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

__log method, path, params, body, url, response, json, took, duration if logger
__trace method, path, params, body, url, response, json, took, duration if tracer
end

# If the response wasn't JSON we just return it as a string
data = json || response.body
::Elasticsearch::Transport::Transport::Response.new response.status, data, response.headers
end

# Returns an array of implementation specific connection errors.
#
# @return [Array]
#
def host_unreachable_exceptions
[
::Manticore::Timeout,
::Manticore::SocketException,
::Manticore::ClientProtocolException,
::Manticore::ResolutionFailure
]
[::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure]
end
end
end
Expand Down
Loading