Skip to content

WIP Threadsafe connection pooling #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elasticsearch-transport/lib/elasticsearch/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require "elasticsearch/transport/transport/serializer/multi_json"
require "elasticsearch/transport/transport/sniffer"
require "elasticsearch/transport/transport/manticore_sniffer"
require "elasticsearch/transport/transport/response"
require "elasticsearch/transport/transport/errors"
require "elasticsearch/transport/transport/base"
Expand Down
5 changes: 5 additions & 0 deletions elasticsearch-transport/lib/elasticsearch/transport/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def perform_request(method, path, params={}, body=nil)
transport.perform_request method, path, params, body
end

# Shuts down the client
def close
@transport.__close_connections
end

# Normalizes and returns hosts configuration.
#
# Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings.
Expand Down
135 changes: 80 additions & 55 deletions elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,19 @@ def initialize(arguments={}, &block)
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
@protocol = options[:protocol] || DEFAULT_PROTOCOL

@logger = options[:logger]
@tracer = options[:tracer]

@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
@counter = 0
@counter_mtx = Mutex.new
@last_request_at = Time.now
@reload_connections = options[:reload_connections]
@reload_after = options[:reload_connections].is_a?(Fixnum) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
@max_retries = options[:retry_on_failure].is_a?(Fixnum) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
@logger = options[:logger]
@tracer = options[:tracer]

sniffer_options = [logger, ]
@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
@counter = 0
@counter_mtx = Mutex.new
@last_request_at = Time.now
@reload_connections = options[:reload_connections]
@reload_after = options[:reload_connections].is_a?(Fixnum) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
@max_retries = options[:retry_on_failure].is_a?(Fixnum) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

# Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.
Expand Down Expand Up @@ -148,8 +149,9 @@ def __trace(method, path, params, body, url, response, json, took, duration)
#
# @api private
def __raise_transport_error(response)
error = ERRORS[response.status] || ServerError
raise error.new "[#{response.status}] #{response.body}"
puts "SET SERVER ERROR TO #{response}"
error = ERRORS[response.status].new(response) || ServerError.new(response)
raise error, "[#{response.status}] #{response.body}"
end

# Converts any non-String object to JSON
Expand Down Expand Up @@ -189,42 +191,56 @@ def __full_url(host)
# @raise [ServerError] If request failed on server
# @raise [Error] If no connection is available
#
def enriching_response(method, path, params, body)
start = Time.now if logger || tracer

url, response = yield

duration = Time.now-start if logger || tracer

if response.status.to_i >= 300
__log method, path, params, body, url, response, nil, 'N/A', duration if logger
__trace method, path, params, body, url, response, nil, 'N/A', duration if tracer
__log_failed response if logger
__raise_transport_error response
end

json = deserialize_response(response)

took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

__log method, path, params, body, url, response, json, took, duration if logger
__trace method, path, params, body, url, response, json, took, duration if tracer


::Elasticsearch::Transport::Transport::Response.new response.status, json || response.body, response.headers
end

def perform_request(method, path, params={}, body=nil, &block)
raise NoMethodError, "Implement this method in your transport class" unless block_given?
start = Time.now if logger || tracer
tries = 0

begin
tries += 1
connection = get_connection or raise Error.new("Cannot get new connection from pool.")

if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
params = connection.connection.params.merge(params.to_hash)
end
begin
with_request_retries do
response = enriching_response(method, path, params, body) do
connection = get_connection or raise Error.new("Cannot get new connection from pool.")

url = connection.full_url(path, params)
if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
params = connection.connection.params.merge(params.to_hash)
end

response = block.call(connection, url)
url = connection.full_url(path, params)

connection.healthy! if connection.failures > 0
block_response = block.call(connection, url)

# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
connection.healthy! if connection.failures > 0

rescue Elasticsearch::Transport::Transport::ServerError => e
if @retry_on_status.include?(response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
if tries <= max_retries
retry
else
logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
raise e
[url, block_response]
end
else
raise e
end

# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
end
rescue *host_unreachable_exceptions => e
logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger

Expand Down Expand Up @@ -253,26 +269,35 @@ def perform_request(method, path, params={}, body=nil, &block)

end #/begin

duration = Time.now-start if logger || tracer

if response.status.to_i >= 300
__log method, path, params, body, url, response, nil, 'N/A', duration if logger
__trace method, path, params, body, url, response, nil, 'N/A', duration if tracer
__log_failed response if logger
__raise_transport_error response
end

json = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

__log method, path, params, body, url, response, json, took, duration if logger
__trace method, path, params, body, url, response, json, took, duration if tracer

Response.new response.status, json || response.body, response.headers
response
ensure
@last_request_at = Time.now
end

def deserialize_response(response)
json = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
end

def with_request_retries
tries = 0
begin
yield
rescue Elasticsearch::Transport::Transport::ServerError => e
if @retry_on_status.include?(e.response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
if tries <= max_retries
retry
else
logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
raise e
end
else
raise e
end
end
end

# @abstract Returns an Array of connection errors specific to the transport implementation.
# See {HTTP::Faraday#host_unreachable_exceptions} for an example.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ class SnifferTimeoutError < Timeout::Error; end

# Elasticsearch server error (HTTP status 5xx)
#
class ServerError < Error; end
class ServerError < Error;
attr_reader :response

def initialize(response)
@response = response
end
end

module Errors; end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require 'manticore'
require "elasticsearch/transport/transport/http/manticore/pool"
require "elasticsearch/transport/transport/http/manticore/adapter"

module Elasticsearch
module Transport
Expand Down Expand Up @@ -43,97 +45,46 @@ module HTTP
# @see Transport::Base
#
class Manticore
attr_reader :pool, :adapter, :options
include Base

def initialize(arguments={}, &block)
@manticore = build_client(arguments[:options] || {})
super(arguments, &block)
end

# Should just be run once at startup
def build_client(options={})
client_options = options[:transport_options] || {}
client_options[:ssl] = options[:ssl] || {}

@manticore = ::Manticore::Client.new(client_options)
end
@options = arguments[:options]
@logger = options[:logger]
@adapter = Adapter.new(logger, options)
# TODO handle HTTPS
@pool = Manticore::Pool.new(logger, @adapter, arguments[:hosts].map {|h| URI::HTTP.build(h).to_s})
@protocol = options[:protocol] || DEFAULT_PROTOCOL
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }

# Performs the request by invoking {Transport::Base#perform_request} with a block.
#
# @return [Response]
# @see Transport::Base#perform_request
#
def perform_request(method, path, params={}, body=nil)
super do |connection, url|
params[:body] = __convert_to_json(body) if body
params = params.merge @request_options
case method
when "GET"
resp = connection.connection.get(url, params)
when "HEAD"
resp = connection.connection.head(url, params)
when "PUT"
resp = connection.connection.put(url, params)
when "POST"
resp = connection.connection.post(url, params)
when "DELETE"
resp = connection.connection.delete(url, params)
else
raise ArgumentError.new "Method #{method} not supported"
if options[:sniffing]
# We don't support sniffers that aren't threadsafe with timers here!
sniffer_class = options[:sniffer_class] ? options[:sniffer_class] : ManticoreSniffer
raise ArgumentError, "Sniffer class #{sniffer_class} must be a ManticoreSniffer!" if sniffer_class.nil? || !sniffer_class.ancestors.include?(ManticoreSniffer)
@sniffer = sniffer_class.new(self, logger)
@sniffer.sniff_every(options[:sniffer_delay] || 5) do |urls|
logger.info("Will update internal host pool with #{urls.inspect}")
@pool.update_urls(urls)
end
Response.new resp.code, resp.read_body, resp.headers
end
end

# Builds and returns a collection of connections.
# Each connection is a Manticore::Client
#
# @return [Connections::Collection]
#
def __build_connections
@request_options = {}
def perform_request(method, path, params={}, body=nil)
with_request_retries do
body = __convert_to_json(body) if body
enriching_response(method, path, params, body) do
url, response = @pool.perform_request(method, path, params, body)

if options.key?(:headers)
@request_options[:headers] = options[:headers]
# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
[url, response]
end
end

Connections::Collection.new \
:connections => hosts.map { |host|
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
host[:port] ||= DEFAULT_PORT

host.delete(:user) # auth is not supported here.
host.delete(:password) # use the headers

Connections::Connection.new \
:host => host,
:connection => @manticore
},
:selector_class => options[:selector_class],
:selector => options[:selector]
end

# Closes all connections by marking them as dead
# and closing the underlying HttpClient instances
#
# @return [Connections::Collection]
#
def __close_connections
@connections.each {|c| c.dead! }
@connections.all.each {|c| c.connection.close }
end

# Returns an array of implementation specific connection errors.
#
# @return [Array]
#
def host_unreachable_exceptions
[
::Manticore::Timeout,
::Manticore::SocketException,
::Manticore::ClientProtocolException,
::Manticore::ResolutionFailure
]
@adapter.manticore.close
end
end
end
Expand Down
Loading