-
Notifications
You must be signed in to change notification settings - Fork 611
Threadsafe Manticore #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Threadsafe Manticore #296
Changes from all commits
6475183
5e3feeb
eef970a
af69ca1
7c09f51
1700ff8
2857ab4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,6 +128,11 @@ def perform_request(method, path, params={}, body=nil) | |
transport.perform_request method, path, params, body | ||
end | ||
|
||
# Shuts down the client | ||
def close | ||
@transport.__close_connections | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is a public api for the transport, it seems wrong to be calling an method prefixed by __ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is that wrong? Isn't that the point of private I think his is a pretty sane default impl. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree __ should be for private methods, but here we're calling the private method from the outside of the |
||
end | ||
|
||
# Normalizes and returns hosts configuration. | ||
# | ||
# Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,7 +114,7 @@ def resurrect! | |
# | ||
def resurrectable? | ||
@state_mutex.synchronize { | ||
Time.now > @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) ) | ||
Time.now >= @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) ) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain a bit the reason for this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was an off by 1 error in the tests that was previously undiscovered for reasons I don't know. |
||
} | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,7 @@ | ||
require 'manticore' | ||
require "elasticsearch/transport/transport/http/manticore/pool" | ||
require "elasticsearch/transport/transport/http/manticore/adapter" | ||
require "elasticsearch/transport/transport/http/manticore/manticore_sniffer" | ||
|
||
module Elasticsearch | ||
module Transport | ||
|
@@ -43,97 +46,138 @@ module HTTP | |
# @see Transport::Base | ||
# | ||
class Manticore | ||
attr_reader :pool, :adapter, :options | ||
include Base | ||
|
||
def initialize(arguments={}, &block) | ||
@manticore = build_client(arguments[:options] || {}) | ||
super(arguments, &block) | ||
@options = arguments[:options] || {} | ||
@options[:http] ||= {} | ||
@logger = options[:logger] | ||
@adapter = Adapter.new(logger, options) | ||
@healthcheck_path = options[:healthcheck_path] || "/" | ||
@pool = Manticore::Pool.new(logger, @adapter, @healthcheck_path, (arguments[:hosts] || []), 5, self.host_unreachable_exceptions, options) | ||
@protocol = options[:protocol] || DEFAULT_PROTOCOL | ||
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) ) | ||
@max_retries = options[:retry_on_failure].is_a?(Fixnum) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES | ||
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, but |
||
|
||
setup_sniffing! | ||
end | ||
|
||
# Should just be run once at startup | ||
def build_client(options={}) | ||
client_options = options[:transport_options] || {} | ||
client_options[:ssl] = options[:ssl] || {} | ||
def setup_sniffing! | ||
if options[:sniffing] || options[:reload_connections] | ||
# We don't support sniffers that aren't threadsafe with timers here! | ||
sniffer_class = options[:sniffer_class] ? options[:sniffer_class] : ::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer | ||
raise ArgumentError, "Sniffer class #{sniffer_class} must be a ManticoreSniffer!" if sniffer_class.nil? || !sniffer_class.ancestors.include?(::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer) | ||
@sniffer = sniffer_class.new(self, logger) | ||
@sniffer.sniff_every(options[:sniffer_delay] || 5) do |urls| | ||
logger.info("Will update internal host pool with #{urls.inspect}") | ||
@pool.update_urls(urls) | ||
end | ||
end | ||
end | ||
|
||
@manticore = ::Manticore::Client.new(client_options) | ||
# Sniff (if enabled) to get the newest list of hosts | ||
# then attempt to resurrect any dead URLs | ||
def reload_connections! | ||
if options[:sniffing] | ||
@pool.update_urls(@sniffer.hosts) | ||
end | ||
@pool.resurrect_dead! | ||
end | ||
|
||
# Performs the request by invoking {Transport::Base#perform_request} with a block. | ||
# | ||
# @return [Response] | ||
# @see Transport::Base#perform_request | ||
# | ||
def perform_request(method, path, params={}, body=nil) | ||
super do |connection, url| | ||
params[:body] = __convert_to_json(body) if body | ||
params = params.merge @request_options | ||
case method | ||
when "GET" | ||
resp = connection.connection.get(url, params) | ||
when "HEAD" | ||
resp = connection.connection.head(url, params) | ||
when "PUT" | ||
resp = connection.connection.put(url, params) | ||
when "POST" | ||
resp = connection.connection.post(url, params) | ||
when "DELETE" | ||
resp = connection.connection.delete(url, params) | ||
body = __convert_to_json(body) if body | ||
url, response = with_request_retries do | ||
url, response = @pool.perform_request(method, path, params, body) | ||
# Raise an exception so we can catch it for `retry_on_status` | ||
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) | ||
[url, response] | ||
end | ||
|
||
enrich_response(method, url, path, params, body, response) | ||
end | ||
|
||
# This takes a host string to aid in debug logging | ||
def with_request_retries | ||
tries = 0 | ||
begin | ||
tries += 1 | ||
yield | ||
rescue ::Elasticsearch::Transport::Transport::ServerError => e | ||
if @retry_on_status.include?(e.response.status) | ||
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so if logger exists and level == debug you get the same message twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just copied from |
||
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger | ||
if tries <= max_retries | ||
retry | ||
else | ||
logger.error "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger | ||
raise e | ||
end | ||
else | ||
raise ArgumentError.new "Method #{method} not supported" | ||
raise e | ||
end | ||
Response.new resp.code, resp.read_body, resp.headers | ||
rescue ::Elasticsearch::Transport::Transport::HostUnreachableError => e | ||
logger.error "[#{e.class}] #{e.message} #{e.url}" if logger | ||
|
||
if @options[:retry_on_failure] | ||
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger | ||
if tries <= max_retries | ||
if @options[:reload_on_failure] && pool.alive_urls_count == 0 | ||
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{max_retries})" if logger | ||
reload_connections! | ||
end | ||
|
||
retry | ||
else | ||
logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger | ||
raise e | ||
end | ||
end | ||
rescue Exception => e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We re-raise it anyway. I'm always worried about other exceptions popping up on JRuby. WDYT? |
||
logger.fatal "[#{e.class}] #{e.message} ()" if logger | ||
raise e | ||
end | ||
end | ||
|
||
# Builds and returns a collection of connections. | ||
# Each connection is a Manticore::Client | ||
# | ||
# @return [Connections::Collection] | ||
# | ||
def __build_connections | ||
@request_options = {} | ||
|
||
if options.key?(:headers) | ||
@request_options[:headers] = options[:headers] | ||
def __close_connections | ||
if @sniffer | ||
logger.info("Closing sniffer...") if logger | ||
@sniffer.close | ||
end | ||
logger.info("Sniffer closed.") if logger | ||
logger.info("Closing pool") if logger | ||
@pool.close # closes adapter as well | ||
logger.info("Pool closed") if logger | ||
end | ||
|
||
Connections::Collection.new \ | ||
:connections => hosts.map { |host| | ||
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL | ||
host[:port] ||= DEFAULT_PORT | ||
def enrich_response(method, url, path, params, body, response) | ||
start = Time.now if logger || tracer | ||
|
||
host.delete(:user) # auth is not supported here. | ||
host.delete(:password) # use the headers | ||
duration = Time.now-start if logger || tracer | ||
|
||
Connections::Connection.new \ | ||
:host => host, | ||
:connection => @manticore | ||
}, | ||
:selector_class => options[:selector_class], | ||
:selector => options[:selector] | ||
end | ||
if response.status.to_i >= 300 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from what I see the response.status is already a number, https://github.com/cheald/manticore/blob/d253390ab3f3e39bb87c113b638c776edd929cde/spec/manticore/response_spec.rb#L19-L21 |
||
__log method, path, params, body, url, response, nil, 'N/A', duration if logger | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there might be an excess of error messages if a request fails permanently: it will log 1 message for each failed try, and then log here again, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just copied from |
||
__trace method, path, params, body, url, response, nil, 'N/A', duration if tracer | ||
__log_failed response if logger | ||
__raise_transport_error response | ||
end | ||
|
||
# Closes all connections by marking them as dead | ||
# and closing the underlying HttpClient instances | ||
# | ||
# @return [Connections::Collection] | ||
# | ||
def __close_connections | ||
# The Manticore adapter uses a single long-lived instance | ||
# of Manticore::Client, so we don't close the connections. | ||
json = __deserialize_response(response) | ||
if json | ||
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer | ||
|
||
__log method, path, params, body, url, response, json, took, duration if logger | ||
__trace method, path, params, body, url, response, json, took, duration if tracer | ||
end | ||
|
||
# If the response wasn't JSON we just return it as a string | ||
data = json || response.body | ||
::Elasticsearch::Transport::Transport::Response.new response.status, data, response.headers | ||
end | ||
|
||
# Returns an array of implementation specific connection errors. | ||
# | ||
# @return [Array] | ||
# | ||
def host_unreachable_exceptions | ||
[ | ||
::Manticore::Timeout, | ||
::Manticore::SocketException, | ||
::Manticore::ClientProtocolException, | ||
::Manticore::ResolutionFailure | ||
] | ||
[::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure] | ||
end | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are Ruby 1.8 compatible in the low-level client, what's the reason for removing 1.8.7 from the Travis configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought these were failing on Master. I see the unit tests are passing here: https://travis-ci.org/elastic/elasticsearch-ruby/builds/120013643 . I must have been mistaken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests might have failed, but I'd like to keep 1.8 compatibility for the low level gems (transport+api) as long as technically possible. (The Rails integration and the
elasticsearch-dsl
gem are already 1.9+ only.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ruby 1.8.7-p370 was released Jun 29. 2012, almost four years ago at this point.
Are people still using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd be amazed :D