Skip to content

Commit 5abd114

Browse files
committed
Work toward threadsafety
Fix off-by-one in comparison for resurrecton (fixes jruby test failures) Use jruby in test matrix Don't test on 1.8.7 anymore Fixed retry logic / added tests for manticore Work toward pool tests Round out tests for manticore sniffer Clarify code around normalizing host parameters Fix healtcheck path option Partial fixes based on PR review Incorporate more PR feedback
1 parent 47258c9 commit 5abd114

File tree

14 files changed

+748
-127
lines changed

14 files changed

+748
-127
lines changed

.travis.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,10 @@ branches:
1414
- travis
1515

1616
rvm:
17-
- 1.8.7
1817
- 1.9.3
1918
- 2.1
2019
- 2.2
21-
22-
jdk:
23-
- openjdk7
20+
- jruby-1.7.24
2421

2522
env:
2623
- TEST_SUITE=unit

elasticsearch-transport/lib/elasticsearch/transport/client.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ def perform_request(method, path, params={}, body=nil)
128128
transport.perform_request method, path, params, body
129129
end
130130

131+
# Shuts down the client
132+
def close
133+
@transport.__close_connections
134+
end
135+
131136
# Normalizes and returns hosts configuration.
132137
#
133138
# Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings.

elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def initialize(arguments={}, &block)
4444
@logger = options[:logger]
4545
@tracer = options[:tracer]
4646

47-
@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
47+
sniffer_class = options.fetch(:sniffer_class, Sniffer)
48+
@sniffer = sniffer_class.new(self, @logger)
4849
@counter = 0
4950
@counter_mtx = Mutex.new
5051
@last_request_at = Time.now
@@ -165,8 +166,10 @@ def __close_connections
165166
#
166167
def __log(method, path, params, body, url, response, json, took, duration)
167168
sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
169+
168170
logger.info "#{method.to_s.upcase} #{sanitized_url} " +
169171
"[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
172+
170173
logger.debug "> #{__convert_to_json(body)}" if body
171174
logger.debug "< #{response.body}"
172175
end
@@ -197,8 +200,8 @@ def __trace(method, path, params, body, url, response, json, took, duration)
197200
# @api private
198201
#
199202
def __raise_transport_error(response)
200-
error = ERRORS[response.status] || ServerError
201-
raise error.new "[#{response.status}] #{response.body}"
203+
error_class = ERRORS.fetch(response.status, ServerError)
204+
raise error_class.new(response), "[#{response.status}] #{response.body}"
202205
end
203206

204207
# Converts any non-String object to JSON
@@ -260,8 +263,7 @@ def perform_request(method, path, params={}, body=nil, &block)
260263

261264
# Raise an exception so we can catch it for `retry_on_status`
262265
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
263-
264-
rescue Elasticsearch::Transport::Transport::ServerError => e
266+
rescue ::Elasticsearch::Transport::Transport::ServerError => e
265267
if @retry_on_status.include?(response.status)
266268
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
267269
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
@@ -323,6 +325,11 @@ def perform_request(method, path, params={}, body=nil, &block)
323325
@last_request_at = Time.now
324326
end
325327

328+
def __deserialize_response(response)
329+
serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
330+
331+
end
332+
326333
# @abstract Returns an Array of connection errors specific to the transport implementation.
327334
# See {HTTP::Faraday#host_unreachable_exceptions} for an example.
328335
#

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/connection.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def resurrect!
114114
#
115115
def resurrectable?
116116
@state_mutex.synchronize {
117-
Time.now > @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) )
117+
Time.now >= @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) )
118118
}
119119
end
120120

elasticsearch-transport/lib/elasticsearch/transport/transport/errors.rb

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,26 @@ class SnifferTimeoutError < Timeout::Error; end
1212

1313
# Elasticsearch server error (HTTP status 5xx)
1414
#
15-
class ServerError < Error; end
15+
class ServerError < Error;
16+
attr_reader :response
17+
18+
def initialize(response)
19+
@response = response
20+
end
21+
end
22+
23+
class HostUnreachableError < Error;
24+
attr_reader :original_error, :url
25+
26+
def initialize(original_error, url)
27+
@original_error = original_error
28+
@url = url
29+
end
30+
31+
def message
32+
"[#{original_error.class}] #{original_error.message}"
33+
end
34+
end
1635

1736
module Errors; end
1837

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb

Lines changed: 134 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
require 'manticore'
2+
require "elasticsearch/transport/transport/http/manticore/pool"
3+
require "elasticsearch/transport/transport/http/manticore/adapter"
4+
require "elasticsearch/transport/transport/http/manticore/manticore_sniffer"
25

36
module Elasticsearch
47
module Transport
@@ -43,97 +46,159 @@ module HTTP
4346
# @see Transport::Base
4447
#
4548
class Manticore
49+
attr_reader :pool, :adapter, :options
4650
include Base
4751

4852
def initialize(arguments={}, &block)
49-
@manticore = build_client(arguments[:options] || {})
50-
super(arguments, &block)
51-
end
52-
53-
# Should just be run once at startup
54-
def build_client(options={})
55-
client_options = options[:transport_options] || {}
56-
client_options[:ssl] = options[:ssl] || {}
53+
@options = arguments[:options] || {}
54+
@logger = options[:logger]
55+
@adapter = Adapter.new(logger, options)
56+
@healthcheck_path = options[:healthcheck_path] || "/"
57+
normalized_hosts = (arguments[:hosts] || []).map {|h| normalize_host(h)}
58+
@pool = Manticore::Pool.new(logger, @adapter, @healthcheck_path, normalized_hosts)
59+
@protocol = options[:protocol] || DEFAULT_PROTOCOL
60+
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
61+
@max_retries = options[:retry_on_failure].is_a?(Fixnum) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
62+
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
5763

58-
@manticore = ::Manticore::Client.new(client_options)
64+
setup_sniffing!
5965
end
6066

61-
# Performs the request by invoking {Transport::Base#perform_request} with a block.
62-
#
63-
# @return [Response]
64-
# @see Transport::Base#perform_request
65-
#
66-
def perform_request(method, path, params={}, body=nil)
67-
super do |connection, url|
68-
params[:body] = __convert_to_json(body) if body
69-
params = params.merge @request_options
70-
case method
71-
when "GET"
72-
resp = connection.connection.get(url, params)
73-
when "HEAD"
74-
resp = connection.connection.head(url, params)
75-
when "PUT"
76-
resp = connection.connection.put(url, params)
77-
when "POST"
78-
resp = connection.connection.post(url, params)
79-
when "DELETE"
80-
resp = connection.connection.delete(url, params)
67+
def normalize_host(host)
68+
case host
69+
when URI
70+
host
71+
when String
72+
URI.parse(host)
73+
when Hash
74+
host = host.clone
75+
host[:scheme] ||= (host[:scheme] || host[:protocol] || "http").to_s
76+
if host[:scheme] == 'http'
77+
URI::HTTP.build(host)
78+
elsif scheme == 'https'
79+
URI::HTTPS.build(host)
8180
else
82-
raise ArgumentError.new "Method #{method} not supported"
81+
raise ArgumentError, "Unrecognized scheme for host #{host}"
8382
end
84-
Response.new resp.code, resp.read_body, resp.headers
83+
else
84+
raise ArgumentError, "Host parameter #{host} is not valid! Try something like 'http://localhost:9200'!"
8585
end
8686
end
8787

88-
# Builds and returns a collection of connections.
89-
# Each connection is a Manticore::Client
90-
#
91-
# @return [Connections::Collection]
92-
#
93-
def __build_connections
94-
@request_options = {}
88+
def setup_sniffing!
89+
if options[:sniffing] || options[:reload_connections]
90+
# We don't support sniffers that aren't threadsafe with timers here!
91+
sniffer_class = options[:sniffer_class] ? options[:sniffer_class] : ::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer
92+
raise ArgumentError, "Sniffer class #{sniffer_class} must be a ManticoreSniffer!" if sniffer_class.nil? || !sniffer_class.ancestors.include?(::Elasticsearch::Transport::Transport::HTTP::Manticore::ManticoreSniffer)
93+
@sniffer = sniffer_class.new(self, logger)
94+
@sniffer.sniff_every(options[:sniffer_delay] || 5) do |urls|
95+
logger.info("Will update internal host pool with #{urls.inspect}")
96+
@pool.update_urls(urls)
97+
end
98+
end
99+
end
95100

96-
if options.key?(:headers)
97-
@request_options[:headers] = options[:headers]
101+
# Sniff (if enabled) to get the newest list of hosts
102+
# then attempt to resurrect any dead URLs
103+
def reload_connections!
104+
if options[:sniffing]
105+
@pool.update_urls(@sniffer.hosts)
98106
end
107+
@pool.resurrect_dead!
108+
end
99109

100-
Connections::Collection.new \
101-
:connections => hosts.map { |host|
102-
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
103-
host[:port] ||= DEFAULT_PORT
110+
def perform_request(method, path, params={}, body=nil)
111+
body = __convert_to_json(body) if body
112+
url, response = with_request_retries do
113+
url, response = @pool.perform_request(method, path, params, body)
114+
# Raise an exception so we can catch it for `retry_on_status`
115+
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
116+
[url, response]
117+
end
118+
119+
enrich_response(method, url, path, params, body, response)
120+
end
121+
122+
# This takes a host string to aid in debug logging
123+
def with_request_retries
124+
tries = 0
125+
begin
126+
tries += 1
127+
yield
128+
rescue ::Elasticsearch::Transport::Transport::ServerError => e
129+
if @retry_on_status.include?(e.response.status)
130+
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
131+
logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
132+
if tries <= max_retries
133+
retry
134+
else
135+
logger.error "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
136+
raise e
137+
end
138+
else
139+
raise e
140+
end
141+
rescue ::Elasticsearch::Transport::Transport::HostUnreachableError => e
142+
logger.error "[#{e.class}] #{e.message} #{e.url}" if logger
104143

105-
host.delete(:user) # auth is not supported here.
106-
host.delete(:password) # use the headers
144+
if @options[:retry_on_failure]
145+
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
146+
if tries <= max_retries
147+
if @options[:reload_on_failure] && pool.alive_urls_count == 0
148+
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{max_retries})" if logger
149+
reload_connections!
150+
end
107151

108-
Connections::Connection.new \
109-
:host => host,
110-
:connection => @manticore
111-
},
112-
:selector_class => options[:selector_class],
113-
:selector => options[:selector]
152+
retry
153+
else
154+
logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
155+
raise e
156+
end
157+
end
158+
rescue Exception => e
159+
logger.fatal "[#{e.class}] #{e.message} ()" if logger
160+
raise e
161+
end
114162
end
115163

116-
# Closes all connections by marking them as dead
117-
# and closing the underlying HttpClient instances
118-
#
119-
# @return [Connections::Collection]
120-
#
121164
def __close_connections
122-
# The Manticore adapter uses a single long-lived instance
123-
# of Manticore::Client, so we don't close the connections.
165+
if @sniffer
166+
logger.info("Closing sniffer...") if logger
167+
@sniffer.close
168+
end
169+
logger.info("Sniffer closed.") if logger
170+
logger.info("Closing pool") if logger
171+
@pool.close # closes adapter as well
172+
logger.info("Pool closed") if logger
173+
end
174+
175+
def enrich_response(method, url, path, params, body, response)
176+
start = Time.now if logger || tracer
177+
178+
duration = Time.now-start if logger || tracer
179+
180+
if response.status.to_i >= 300
181+
__log method, path, params, body, url, response, nil, 'N/A', duration if logger
182+
__trace method, path, params, body, url, response, nil, 'N/A', duration if tracer
183+
__log_failed response if logger
184+
__raise_transport_error response
185+
end
186+
187+
json = __deserialize_response(response)
188+
if json
189+
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer
190+
191+
__log method, path, params, body, url, response, json, took, duration if logger
192+
__trace method, path, params, body, url, response, json, took, duration if tracer
193+
end
194+
195+
# If the response wasn't JSON we just return it as a string
196+
data = json || response.body
197+
::Elasticsearch::Transport::Transport::Response.new response.status, data, response.headers
124198
end
125199

126-
# Returns an array of implementation specific connection errors.
127-
#
128-
# @return [Array]
129-
#
130200
def host_unreachable_exceptions
131-
[
132-
::Manticore::Timeout,
133-
::Manticore::SocketException,
134-
::Manticore::ClientProtocolException,
135-
::Manticore::ResolutionFailure
136-
]
201+
[::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure]
137202
end
138203
end
139204
end

0 commit comments

Comments
 (0)