Skip to content

Commit 5f97cd4

Browse files
committed
Checkpoint, better pool class
1 parent 0466d4b commit 5f97cd4

File tree

6 files changed

+222
-19
lines changed

6 files changed

+222
-19
lines changed

elasticsearch-transport/elasticsearch-transport.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Gem::Specification.new do |s|
4848
s.add_development_dependency "patron" unless defined? JRUBY_VERSION
4949
s.add_development_dependency "typhoeus", '~> 0.6'
5050
s.add_development_dependency "net-http-persistent"
51-
s.add_development_dependency "manticore", '~> 0.3.5' if defined? JRUBY_VERSION
51+
s.add_development_dependency "manticore", '~> 0.5.2' if defined? JRUBY_VERSION
5252
s.add_development_dependency "hashie"
5353

5454
# Prevent unit test failures on Ruby 1.8

elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def initialize(arguments={}, &block)
3333
@hosts = arguments[:hosts] || []
3434
@options = arguments[:options] || {}
3535
@block = block
36-
@connections = __build_connections
36+
@connections = Connections::Collection.new(:connections => __build_connections)
3737

3838
@serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
3939
@protocol = options[:protocol] || DEFAULT_PROTOCOL

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/collection.rb

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
require 'concurrent'
22
require 'thread'
33

4+
5+
# 1 manticore instance
6+
# Our own connection manager
7+
# Pick the least in-use connection
8+
# attempt to resurrect a dead connection if none are alive
9+
# in the background periodically attempt to resurrect dead connections
10+
#
11+
412
module Elasticsearch
513
module Transport
614
module Transport
@@ -25,9 +33,12 @@ class Collection
2533
#
2634
def initialize(arguments={})
2735
selector_class = arguments[:selector_class] || DEFAULT_SELECTOR
28-
@connections = Concurrent::Map.new
29-
@connections = arguments[:connections] || []
30-
@selector = arguments[:selector] || selector_class.new(arguments.merge(:connections => self))
36+
37+
@connection_checkouts = {}
38+
(arguments[:connections] || []).each do |conn|
39+
@connection_checkouts[conn] = 0
40+
end
41+
3142
@state_mutex = Mutex.new
3243
end
3344

@@ -37,14 +48,18 @@ def initialize(arguments={})
3748
def update(new_connections)
3849
new_connections.each {|connection| add_connection(connection)}
3950

40-
@connections.each do |connection,_|
41-
@connections.delete(connection) unless new_connections.include?(connection)
42-
connection.close
51+
@state_mutex.synchronize do
52+
@connections.each do |connection,_|
53+
@connections.delete(connection) unless new_connections.include?(connection)
54+
connection.close
55+
end
4356
end
4457
end
4558

4659
def add_connection(connection)
47-
@connections.put_if_absent(connection, true)
60+
@state_mutex.synchronize do
61+
@connections[connection] = 0 unless @connections[connection]
62+
end
4863
end
4964
alias :<< :add_connection
5065

@@ -53,15 +68,19 @@ def add_connection(connection)
5368
# @return [Array]
5469
#
5570
def hosts
56-
@connections.to_a.map { |c| c.host }
71+
@state_mutex.synchronize do
72+
@connections.to_a.map { |c| c.host }
73+
end
5774
end
5875

5976
# Returns an Array of alive connections.
6077
#
6178
# @return [Array]
6279
#
6380
def connections
64-
@connections.keys.reject { |c| c.dead? }
81+
@state_mutex.synchronize do
82+
@connections.keys.reject { |c| c.dead? }
83+
end
6584
end
6685
alias :alive :connections
6786

@@ -70,15 +89,26 @@ def connections
7089
# @return [Array]
7190
#
7291
def dead
73-
@connections.keys.select { |c| c.dead? }
92+
@state_mutex.synchronize do
93+
dead_unsafe
94+
end
7495
end
7596

7697
# Returns an Array of all connections, both dead and alive
7798
#
7899
# @return [Array]
79100
#
80101
def all
81-
@connections.keys
102+
@state_mutex.synchronize do
103+
@connections.keys
104+
end
105+
end
106+
107+
def with_connection(options={})
108+
connection = get_connection(options)
109+
yield(connection)
110+
ensure
111+
return_connection(connection) if connection
82112
end
83113

84114
# Returns a connection.
@@ -89,10 +119,31 @@ def all
89119
# @return [Connection]
90120
#
91121
def get_connection(options={})
92-
if connections.empty? && (dead_connection = dead.sort { |a,b| a.failures <=> b.failures }.first)
93-
dead_connection.alive!
122+
@state_mutex.synchronize do
123+
if @connections.empty?
124+
if (dead_connection = dead_unsafe.keys.sort { |a,b| a.failures <=> b.failures }.first)
125+
dead_connection.alive!
126+
end
127+
end
128+
129+
# It'd be great if we had a concurrent version of a ConcurrentSkipListMap but we don't :(
130+
min_kv = @connections.min_by {|k,v| v}
131+
if min_kv
132+
connection, count = min_kv
133+
@connections[connection] = count + 1
134+
connection
135+
else
136+
nil
137+
end
138+
end
139+
end
140+
141+
def return_connection(connection)
142+
@state_mutex.synchronize do
143+
if (count = @connections[connection])
144+
@connections[connection] = count - 1
145+
end
94146
end
95-
selector.select(options)
96147
end
97148

98149
def each(&block)
@@ -106,8 +157,12 @@ def [](key)
106157
def size
107158
connections.size
108159
end
109-
end
160+
end
110161

162+
private
163+
def dead_unsafe
164+
@connections.keys.select { |c| c.dead? }
165+
end
111166
end
112167
end
113168
end

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/selector.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'concurrent'
2+
13
module Elasticsearch
24
module Transport
35
module Transport
@@ -44,6 +46,10 @@ def select(options={})
4446
class RoundRobin
4547
include Base
4648

49+
def initialize
50+
@current_index
51+
end
52+
4753
# Returns the next connection from the collection, rotating them in round-robin fashion.
4854
#
4955
# @return [Connections::Connection]
@@ -53,6 +59,7 @@ def select(options={})
5359
@current = !defined?(@current) || @current.nil? ? 0 : @current+1
5460
@current = 0 if @current >= connections.size
5561
connections[@current]
62+
require 'pry'; binding.pry
5663
end
5764
end
5865

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class Manticore
4747

4848
def initialize(arguments={}, &block)
4949
@manticore = build_client(arguments[:options] || {})
50-
super()
50+
super
5151
end
5252

5353
# Should just be run once at startup
@@ -115,7 +115,9 @@ def __build_connections
115115
#
116116
def __close_connections
117117
@connections.each {|c| c.dead! }
118-
@connections.all.each {|c| c.connection.close }
118+
# We don't close the actual manticore client, but rather
119+
# just let the pool mechanics close it for us.
120+
# It uses the pooling functionality from Apache HC https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
119121
end
120122

121123
# Returns an array of implementation specific connection errors.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
module Elasticsearch
2+
module Transport
3+
module Transport
4+
class Pool
5+
class RequestFailure < StandardError; end
6+
7+
def initialize(client_wrapper, resurrect_interval)
8+
@state_mutex = Mutex.new
9+
@url_info = {}
10+
@client = client_wrapper
11+
@stopping = false
12+
@resurrect_interval = resurrect_interval
13+
@resurrectionist = start_resurrectionist
14+
end
15+
16+
def start_resurrectionist
17+
Thread.new do
18+
slept_for = 0
19+
loop do
20+
sleep 1
21+
slept_for += 1
22+
loop do
23+
break if @state_mutex.synchronize { @stopping }
24+
if slept_for >= @resurrect_interval
25+
slept_for = 0
26+
resurrect_dead!
27+
end
28+
end
29+
end
30+
end
31+
end
32+
33+
def resurrect_dead!
34+
# Try to keep locking granularity low such that we don't affect IO...
35+
@state_mutex.synchronize { @url_info.select {|u,m| m[:dead] } }.each do |url,meta|
36+
begin
37+
@client.perform_request(url, :get, "/")
38+
# If no exception was raised it must have succeeded!
39+
@state_mutex.synchronize { m[:dead] = false }
40+
rescue RequestFailure => e
41+
# NOOP, we'll just try this another time...
42+
end
43+
end
44+
end
45+
46+
def stop_resurrectionist
47+
@state_mutex.synchronize { @stopping = true }
48+
@resurrectionist.join # Wait for thread to stop
49+
end
50+
51+
def perform_request(method, path, params={}, body=nil, &block)
52+
with_connection do |url|
53+
@client.perform_request(url, method, path, params={}, body=nil, &block)
54+
end
55+
end
56+
57+
def update_urls(new_urls)
58+
@state_mutex.synchronize do
59+
# Add new connections
60+
new_urls.each do |url|
61+
add_url(url)
62+
end
63+
64+
# Delete connections not in the new list
65+
@url_info.each do |url,_|
66+
remove_url(url)
67+
end
68+
end
69+
end
70+
71+
def add_url(url)
72+
@client.add_url(url)
73+
@url_info[connection] ||= empty_url_meta
74+
end
75+
76+
def remove_url(url)
77+
@client.remove_url(url)
78+
@url_info.delete(url) unless new_connections.include?(url)
79+
end
80+
81+
def empty_url_meta
82+
{
83+
:open => 0,
84+
:dead => false
85+
}
86+
end
87+
88+
def with_connection
89+
url, url_meta = get_connection
90+
yield url
91+
rescue RequestFailure => e
92+
@state_mutex.synchronize do
93+
url_meta[:dead] = true
94+
url_meta[:last_error] = e
95+
end
96+
ensure
97+
return_connection(connection)
98+
end
99+
100+
def get_connection
101+
@state_mutex.synchronize do
102+
# The goal here is to pick a random connection from the least-in-use connections
103+
# We want some randomness so that we don't hit the same node over and over, but
104+
# we also want more 'fair' behavior in the event of high concurrency
105+
eligible_set = nil
106+
lowest_value_seen = nil
107+
@url_info.each do |url,meta|
108+
meta_open = meta[:open]
109+
next if meta[:dead]
110+
111+
if lowest_value_seen.nil? || meta_open < lowest_value_seen
112+
lowest_value_seen = meta_open
113+
eligible_set = [[url, meta]]
114+
elsif lowest_value_seen == meta_open
115+
eligible_set << [url, meta]
116+
end
117+
end
118+
119+
return nil if eligible_set.nil?
120+
121+
pick_and_meta = eligible_set.sample
122+
pick, pick_meta = pick_and_meta
123+
pick_meta[:open] += 1
124+
125+
pick_and_meta
126+
end
127+
end
128+
129+
def return_connection(connection)
130+
@state_mutex.synchronize do
131+
if @url_info[connection] # Guard against the condition where the connection has already been deleted
132+
@url_info[connection][:open] -= 1
133+
end
134+
end
135+
end
136+
end
137+
end
138+
end
139+
end

0 commit comments

Comments
 (0)