Skip to content

Commit 7be6ebd

Browse files
committed
Partial fixes based on PR review
1 parent 3851ceb commit 7be6ebd

File tree

7 files changed

+69
-72
lines changed

7 files changed

+69
-72
lines changed

elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ def initialize(arguments={}, &block)
4242
@logger = options[:logger]
4343
@tracer = options[:tracer]
4444

45-
sniffer_class = options[:sniffer_class] ? options[:sniffer_class] : Sniffer
46-
@sniffer = sniffer_class.new(self, logger)
45+
sniffer_class = options.fetch(:sniffer_class, Sniffer)
46+
@sniffer = sniffer_class.new(self, @logger)
4747
@counter = 0
4848
@counter_mtx = Mutex.new
4949
@last_request_at = Time.now
@@ -151,8 +151,8 @@ def __trace(method, path, params, body, url, response, json, took, duration)
151151
#
152152
# @api private
153153
def __raise_transport_error(response)
154-
error = ERRORS[response.status].new(response) || ServerError.new(response)
155-
raise error, "[#{response.status}] #{response.body}"
154+
error_class = ERRORS.fetch(response.status, ServerError)
155+
raise error_class.new(response), "[#{response.status}] #{response.body}"
156156
end
157157

158158
# Converts any non-String object to JSON
@@ -275,8 +275,9 @@ def perform_request(method, path, params={}, body=nil, &block)
275275
@last_request_at = Time.now
276276
end
277277

278-
def deserialize_response(response)
278+
def __deserialize_response(response)
279279
serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
280+
280281
end
281282

282283
# @abstract Returns an Array of connection errors specific to the transport implementation.

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ def initialize(arguments={}, &block)
5454
@logger = options[:logger]
5555
@adapter = Adapter.new(logger, options)
5656
@healthcheck_path = options[:healthcheck_path] || "/"
57-
# TODO handle HTTPS
5857
normalized_hosts = arguments[:hosts].map {|h| normalize_host(h)}
5958
@pool = Manticore::Pool.new(logger, @adapter, @healthcheck_path, normalized_hosts)
6059
@protocol = options[:protocol] || DEFAULT_PROTOCOL
@@ -67,22 +66,22 @@ def initialize(arguments={}, &block)
6766

6867
def normalize_host(host)
6968
case host
70-
when URI
71-
host.to_s
72-
when String
73-
URI.parse(host).to_s
74-
when Hash
75-
host = host.clone
76-
host[:scheme] ||= (host[:scheme] || host[:protocol] || "http").to_s
77-
if host[:scheme] == 'http'
78-
URI::HTTP.build(host)
79-
elsif scheme == 'https'
80-
URI::HTTPS.build(host)
81-
else
82-
raise ArgumentError, "Unrecognized scheme for host #{host}"
83-
end
69+
when URI
70+
host
71+
when String
72+
URI.parse(host)
73+
when Hash
74+
host = host.clone
75+
host[:scheme] ||= (host[:scheme] || host[:protocol] || "http").to_s
76+
if host[:scheme] == 'http'
77+
URI::HTTP.build(host)
78+
elsif scheme == 'https'
79+
URI::HTTPS.build(host)
8480
else
85-
raise "Host parameter #{host} is not valid! Try something like 'http://localhost:9200'!"
81+
raise ArgumentError, "Unrecognized scheme for host #{host}"
82+
end
83+
else
84+
raise ArgumentError, "Host parameter #{host} is not valid! Try something like 'http://localhost:9200'!"
8685
end
8786
end
8887

@@ -109,16 +108,15 @@ def reload_connections!
109108
end
110109

111110
def perform_request(method, path, params={}, body=nil)
112-
with_request_retries do
113-
body = __convert_to_json(body) if body
114-
enriching_response(method, path, params, body) do
115-
url, response = @pool.perform_request(method, path, params, body)
116-
117-
# Raise an exception so we can catch it for `retry_on_status`
118-
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
119-
[url, response]
120-
end
111+
body = __convert_to_json(body) if body
112+
url, response = with_request_retries do
113+
url, response = @pool.perform_request(method, path, params, body)
114+
# Raise an exception so we can catch it for `retry_on_status`
115+
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
116+
[url, response]
121117
end
118+
119+
enrich_response(method, url, path, params, body, response)
122120
end
123121

124122
# This takes a host string to aid in debug logging
@@ -134,7 +132,7 @@ def with_request_retries
134132
if tries <= max_retries
135133
retry
136134
else
137-
logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
135+
logger.error "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
138136
raise e
139137
end
140138
else
@@ -147,7 +145,7 @@ def with_request_retries
147145
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
148146
if tries <= max_retries
149147
if @options[:reload_on_failure] && pool.alive_urls_count == 0
150-
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
148+
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{max_retries})" if logger
151149
reload_connections!
152150
end
153151

@@ -171,13 +169,12 @@ def __close_connections
171169
logger.info("Sniffer closed.") if logger
172170
logger.info("Closing pool") if logger
173171
@pool.close # closes adapter as well
172+
logger.info("Pool closed") if logger
174173
end
175174

176-
def enriching_response(method, path, params, body)
175+
def enrich_response(method, url, path, params, body, response)
177176
start = Time.now if logger || tracer
178177

179-
url, response = yield
180-
181178
duration = Time.now-start if logger || tracer
182179

183180
if response.status.to_i >= 300
@@ -187,15 +184,17 @@ def enriching_response(method, path, params, body)
187184
__raise_transport_error response
188185
end
189186

190-
json = deserialize_response(response)
191-
192-
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer
193-
194-
__log method, path, params, body, url, response, json, took, duration if logger
195-
__trace method, path, params, body, url, response, json, took, duration if tracer
187+
json = __deserialize_response(response)
188+
if json
189+
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer
196190

191+
__log method, path, params, body, url, response, json, took, duration if logger
192+
__trace method, path, params, body, url, response, json, took, duration if tracer
193+
end
197194

198-
::Elasticsearch::Transport::Transport::Response.new response.status, json || response.body, response.headers
195+
# If the response wasn't JSON we just return it as a string
196+
data = json || response.body
197+
::Elasticsearch::Transport::Transport::Response.new response.status, data, response.headers
199198
end
200199
end
201200
end

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore/adapter.rb

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ module Transport
33
module Transport
44
module HTTP
55
class Manticore
6+
# While the Manticore Pool only supports Manticore today, one day other adapters
7+
# may use it. This class provides an abstraction around the actual HTTP client for
8+
# the pool.
69
class Adapter
710
attr_reader :manticore, :logger
811

@@ -31,18 +34,18 @@ def perform_request(url, method, path, params={}, body=nil)
3134
params[:body] = body if body
3235
url_and_path = (url + path).to_s # Convert URI object to string
3336
case method
34-
when "GET"
35-
resp = @manticore.get(url_and_path, params)
36-
when "HEAD"
37-
resp = @manticore.head(url_and_path, params)
38-
when "PUT"
39-
resp = @manticore.put(url_and_path, params)
40-
when "POST"
41-
resp = @manticore.post(url_and_path, params)
42-
when "DELETE"
43-
resp = @manticore.delete(url_and_path, params)
44-
else
45-
raise ArgumentError.new "Method #{method} not supported"
37+
when "GET"
38+
resp = @manticore.get(url_and_path, params)
39+
when "HEAD"
40+
resp = @manticore.head(url_and_path, params)
41+
when "PUT"
42+
resp = @manticore.put(url_and_path, params)
43+
when "POST"
44+
resp = @manticore.post(url_and_path, params)
45+
when "DELETE"
46+
resp = @manticore.delete(url_and_path, params)
47+
else
48+
raise ArgumentError.new "Method #{method} not supported"
4649
end
4750
Response.new resp.code, resp.read_body, resp.headers
4851
end

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore/manticore_sniffer.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@ class Manticore
66
# Handles node discovery ("sniffing")
77
#
88
class ManticoreSniffer < ::Elasticsearch::Transport::Transport::Sniffer
9-
ES1_RE_URL = /\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
10-
ES2_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/
11-
129
attr_reader :transport
1310
attr_accessor :timeout
1411

1512
# @param transport [Object] A transport instance
1613
#
1714
def initialize(*args)
18-
@timeout = 1
15+
@timeout = 1 # in seconds
1916
@state_mutex = Mutex.new
2017
@stopping = false
2118
super
@@ -42,15 +39,15 @@ def sniff_every(interval_seconds, &block)
4239
last_sniff = Time.now
4340
end
4441
rescue Exception => e
45-
logger.warn("Error while sniffing Elasticsearch Nodes! [#{e.class.name}][#{e.message}][#{e.backtrace.to_a}]") if logger
42+
logger.warn("Error while sniffing Elasticsearch Nodes! [#{e.class.name}][#{e.message}][#{e.backtrace.join("\n")}]") if logger
4643
end
4744
end
4845
end
4946
end
5047
end
5148

5249
def hosts
53-
nodes = transport.perform_request('GET', '_nodes/http', :timeout => timeout).body
50+
nodes = transport.perform_request('GET', '_nodes/http', :request_timeout => timeout).body
5451
urls = hosts_from_nodes(nodes).map {|n| "#{transport.protocol}://#{n["http_address"]}"}
5552
urls
5653
end

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore/pool.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ def initialize(logger, adapter, healthcheck_path="/", urls=[], resurrect_interva
1919
end
2020

2121
def close
22-
logger.info "Stopping resurrectionist" if logger
22+
logger.debug "Stopping resurrectionist" if logger
2323
stop_resurrectionist
24-
logger.info "Waiting for open manticore connections" if logger
24+
logger.debug "Waiting for open manticore connections" if logger
2525
wait_for_open_connections
26-
logger.info("Closing adapter #{@adapter}") if logger
26+
logger.debug("Closing adapter #{@adapter}") if logger
2727
@adapter.close
2828
end
2929

3030
def wait_for_open_connections
3131
until open_connections.empty?
32-
logger.info "Blocked on shutdown to to open connections #{@state_mutex.synchronize {@url_info}}" if logger
32+
logger.info "Blocked on shutdown to open connections #{@state_mutex.synchronize {@url_info}}" if logger
3333
sleep 1
3434
end
3535
end
@@ -58,7 +58,7 @@ def resurrect_dead!
5858
# Try to keep locking granularity low such that we don't affect IO...
5959
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:dead] } }.each do |url,meta|
6060
begin
61-
@logger.info("Checking url #{url} with path #{@healthcheck_path} to see if node resurrected! ")
61+
@logger.info("Checking url #{url} with path #{@healthcheck_path} to see if node resurrected")
6262
perform_request_to_url(url, "GET", @healthcheck_path)
6363
# If no exception was raised it must have succeeded!
6464
logger.warn("Resurrected connection to dead ES instance at #{url}")
@@ -81,7 +81,7 @@ def perform_request(method, path, params={}, body=nil)
8181
end
8282

8383
def perform_request_to_url(url, method, path, params={}, body=nil)
84-
raise ArgumentError, "No URL specified!" unless url
84+
raise ArgumentError, "No URL specified" unless url
8585

8686
@adapter.perform_request(url, method, path, params, body)
8787
rescue ::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure => e
@@ -131,7 +131,7 @@ def empty_url_meta
131131
def with_connection
132132
url, url_meta = get_connection
133133

134-
raise Error, "No Available connections!" unless url
134+
raise Error, "No Available connections" unless url
135135
yield url
136136
rescue ::Elasticsearch::Transport::Transport::HostUnreachableError => e
137137
mark_dead(url, e)
@@ -143,7 +143,7 @@ def with_connection
143143
def mark_dead(url, error)
144144
@state_mutex.synchronize do
145145
url_meta = @url_info[url]
146-
logger.warn("Marking url #{url} as dead! Last error: [#{error.class}] #{error.message}")
146+
logger.warn("Marking url #{url} as dead. Last error: [#{error.class}] #{error.message}")
147147
url_meta[:dead] = true
148148
url_meta[:last_error] = error
149149
end
@@ -160,7 +160,7 @@ def get_connection
160160
# The goal here is to pick a random connection from the least-in-use connections
161161
# We want some randomness so that we don't hit the same node over and over, but
162162
# we also want more 'fair' behavior in the event of high concurrency
163-
raise "No available connections!" if @url_info.empty?
163+
raise "No available connections." if @url_info.empty?
164164

165165
eligible_set = nil
166166
lowest_value_seen = nil

elasticsearch-transport/lib/elasticsearch/transport/transport/sniffer.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ def hosts
3535
end
3636
end
3737

38-
protected
39-
4038
def hosts_from_nodes(nodes)
4139
hosts = nodes['nodes'].map do |id,info|
4240
addr_str = info["#{transport.protocol}_address"].to_s
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
module Elasticsearch
22
module Transport
33
VERSION = "1.0.16.pre"
4-
#VERSION = "1.0.16.pre"
54
end
65
end

0 commit comments

Comments
 (0)