Skip to content

Commit 7f769fb

Browse files
committed
Checkpoint
1 parent 20c1b6f commit 7f769fb

File tree

4 files changed

+242
-105
lines changed

4 files changed

+242
-105
lines changed

elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,10 @@ def perform_request(method, path, params={}, body=nil, &block)
273273
@last_request_at = Time.now
274274
end
275275

276+
def perform_with_retry
277+
278+
end
279+
276280
# @abstract Returns an Array of connection errors specific to the transport implementation.
277281
# See {HTTP::Faraday#host_unreachable_exceptions} for an example.
278282
#

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb

Lines changed: 50 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'manticore'
2+
require "elasticsearch/transport/transport/http/manticore/pool"
23

34
module Elasticsearch
45
module Transport
@@ -43,97 +44,73 @@ module HTTP
4344
# @see Transport::Base
4445
#
4546
class Manticore
47+
attr_reader :pool, :adapter
4648
include Base
4749

48-
def initialize(arguments={}, &block)
49-
@manticore = build_client(arguments[:options] || {})
50-
super(arguments, &block)
51-
end
50+
class Adapter
51+
attr_reader :manticore
5252

53-
# Should just be run once at startup
54-
def build_client(options={})
55-
client_options = options[:transport_options] || {}
56-
client_options[:ssl] = options[:ssl] || {}
53+
def initialize(options)
54+
build_client(options || {})
55+
end
5756

58-
@manticore = ::Manticore::Client.new(client_options)
59-
end
57+
# Should just be run once at startup
58+
def build_client(options={})
59+
client_options = options[:transport_options] || {}
60+
client_options[:ssl] = options[:ssl] || {}
6061

61-
# Performs the request by invoking {Transport::Base#perform_request} with a block.
62-
#
63-
# @return [Response]
64-
# @see Transport::Base#perform_request
65-
#
66-
def perform_request(method, path, params={}, body=nil)
67-
super do |connection, url|
68-
params[:body] = __convert_to_json(body) if body
62+
@request_options = options[:headers] ? {:headers => options[:headers]} : {}
63+
@manticore = ::Manticore::Client.new(client_options)
64+
end
65+
66+
# Performs the request by invoking {Transport::Base#perform_request} with a block.
67+
#
68+
# @return [Response]
69+
# @see Transport::Base#perform_request
70+
#
71+
def perform_request(url, method, path, params={}, body=nil)
6972
params = params.merge @request_options
73+
params[:body] = body if body
74+
url_and_path = url + path
7075
case method
71-
when "GET"
72-
resp = connection.connection.get(url, params)
73-
when "HEAD"
74-
resp = connection.connection.head(url, params)
75-
when "PUT"
76-
resp = connection.connection.put(url, params)
77-
when "POST"
78-
resp = connection.connection.post(url, params)
79-
when "DELETE"
80-
resp = connection.connection.delete(url, params)
81-
else
82-
raise ArgumentError.new "Method #{method} not supported"
76+
when "GET"
77+
resp = @manticore.get(url_and_path, params)
78+
when "HEAD"
79+
resp = @manticore.head(url_and_path, params)
80+
when "PUT"
81+
resp = @manticore.put(url_and_path, params)
82+
when "POST"
83+
resp = @manticore.post(url_and_path, params)
84+
when "DELETE"
85+
resp = @manticore.delete(url_and_path, params)
86+
else
87+
raise ArgumentError.new "Method #{method} not supported"
8388
end
8489
Response.new resp.code, resp.read_body, resp.headers
8590
end
86-
end
87-
88-
# Builds and returns a collection of connections.
89-
# Each connection is a Manticore::Client
90-
#
91-
# @return [Connections::Collection]
92-
#
93-
def __build_connections
94-
@request_options = {}
9591

96-
if options.key?(:headers)
97-
@request_options[:headers] = options[:headers]
92+
def close
93+
@manticore.close
9894
end
9995

100-
Connections::Collection.new \
101-
:connections => hosts.map { |host|
102-
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
103-
host[:port] ||= DEFAULT_PORT
104-
105-
host.delete(:user) # auth is not supported here.
106-
host.delete(:password) # use the headers
96+
def add_url(url)
97+
end
10798

108-
Connections::Connection.new \
109-
:host => host,
110-
:connection => @manticore
111-
},
112-
:selector_class => options[:selector_class],
113-
:selector => options[:selector]
99+
def remove_url(url)
100+
end
114101
end
115102

116-
# Closes all connections by marking them as dead
117-
# and closing the underlying HttpClient instances
118-
#
119-
# @return [Connections::Collection]
120-
#
121-
def __close_connections
122-
@connections.each {|c| c.dead! }
123-
@connections.all.each {|c| c.connection.close }
103+
def initialize(arguments={}, &block)
104+
@adapter = Adapter.new(arguments[:options])
105+
# TODO handle HTTPS
106+
@pool = Manticore::Pool.new(@adapter, arguments[:hosts].map {|h| URI::HTTP.build(h).to_s})
107+
options = arguments[:options] || {}
108+
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
124109
end
125110

126-
# Returns an array of implementation specific connection errors.
127-
#
128-
# @return [Array]
129-
#
130-
def host_unreachable_exceptions
131-
[
132-
::Manticore::Timeout,
133-
::Manticore::SocketException,
134-
::Manticore::ClientProtocolException,
135-
::Manticore::ResolutionFailure
136-
]
111+
def perform_request(method, path, params={}, body=nil)
112+
body = __convert_to_json(body) if body
113+
@pool.perform_request(method, path, params, body)
137114
end
138115
end
139116
end
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
module Elasticsearch
2+
module Transport
3+
module Transport
4+
module HTTP
5+
class Manticore
6+
class Pool
7+
class FatalURLError < StandardError; end
8+
9+
def initialize(client_wrapper, urls=[], resurrect_interval=5)
10+
@state_mutex = Mutex.new
11+
@url_info = {}
12+
@client = client_wrapper
13+
@stopping = false
14+
@resurrect_interval = resurrect_interval
15+
@resurrectionist = start_resurrectionist
16+
update_urls(urls)
17+
end
18+
19+
def close
20+
stop_resurrectionist
21+
wait_for_open_connections
22+
@client.close
23+
end
24+
25+
def wait_for_open_connections
26+
until open_connections == 0
27+
sleep 1
28+
end
29+
end
30+
31+
def open_connections
32+
@state_mutex.synchronize { @url_info.values.map {|v| v[:open] }}
33+
end
34+
35+
def start_resurrectionist
36+
Thread.new do
37+
slept_for = 0
38+
loop do
39+
sleep 1
40+
slept_for += 1
41+
42+
break if @state_mutex.synchronize { @stopping }
43+
44+
if slept_for >= @resurrect_interval
45+
slept_for = 0
46+
resurrect_dead!
47+
end
48+
end
49+
end
50+
end
51+
52+
def resurrect_dead!
53+
# Try to keep locking granularity low such that we don't affect IO...
54+
@state_mutex.synchronize { @url_info.select {|u,m| m[:dead] } }.each do |url,meta|
55+
begin
56+
@client.perform_request(url, :get, "/")
57+
# If no exception was raised it must have succeeded!
58+
@state_mutex.synchronize { m[:dead] = false }
59+
rescue FatalURLError => e
60+
# NOOP, we'll just try this another time...
61+
end
62+
end
63+
end
64+
65+
def stop_resurrectionist
66+
@state_mutex.synchronize { @stopping = true }
67+
@resurrectionist.join # Wait for thread to stop
68+
end
69+
70+
def perform_request(method, path, params={}, body=nil)
71+
with_connection do |url|
72+
begin
73+
@client.perform_request(url, method, path, params, body)
74+
rescue ::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure => e
75+
logger.error "[#{e.class}] #{e.message} #{url}" if logger
76+
raise FatalURLError, "Could not reach host #{e.class}: #{e.message}"
77+
end
78+
end
79+
end
80+
81+
def update_urls(new_urls)
82+
@state_mutex.synchronize do
83+
# Add new connections
84+
new_urls.each do |url|
85+
add_url(url)
86+
end
87+
88+
# Delete connections not in the new list
89+
@url_info.each do |url,_|
90+
remove_url(url) unless new_urls.include?(url)
91+
end
92+
end
93+
end
94+
95+
def add_url(url)
96+
@client.add_url(url)
97+
@url_info[url] ||= empty_url_meta
98+
end
99+
100+
def remove_url(url)
101+
@client.remove_url(url)
102+
@url_info.delete(url)
103+
end
104+
105+
def empty_url_meta
106+
{
107+
:open => 0,
108+
:dead => false
109+
}
110+
end
111+
112+
def with_connection
113+
url, url_meta = get_connection
114+
yield url
115+
rescue FatalURLError => e
116+
@state_mutex.synchronize do
117+
url_meta[:dead] = true
118+
url_meta[:last_error] = e
119+
end
120+
ensure
121+
return_connection(url)
122+
end
123+
124+
def get_connection
125+
@state_mutex.synchronize do
126+
# The goal here is to pick a random connection from the least-in-use connections
127+
# We want some randomness so that we don't hit the same node over and over, but
128+
# we also want more 'fair' behavior in the event of high concurrency
129+
raise "No available connections!" if @url_info.empty?
130+
131+
eligible_set = nil
132+
lowest_value_seen = nil
133+
@url_info.each do |url,meta|
134+
meta_open = meta[:open]
135+
next if meta[:dead]
136+
137+
if lowest_value_seen.nil? || meta_open < lowest_value_seen
138+
lowest_value_seen = meta_open
139+
eligible_set = [[url, meta]]
140+
elsif lowest_value_seen == meta_open
141+
eligible_set << [url, meta]
142+
end
143+
end
144+
145+
return nil if eligible_set.nil?
146+
147+
pick_and_meta = eligible_set.sample
148+
pick, pick_meta = pick_and_meta
149+
pick_meta[:open] += 1
150+
151+
pick_and_meta
152+
end
153+
end
154+
155+
def return_connection(url)
156+
@state_mutex.synchronize do
157+
if @url_info[url] # Guard against the condition where the connection has already been deleted
158+
@url_info[url][:open] -= 1
159+
end
160+
end
161+
end
162+
end
163+
end
164+
end
165+
end
166+
end
167+
end

0 commit comments

Comments
 (0)