Skip to content

Commit 0466d4b

Browse files
committed
Threadsafe connection pooling
1 parent 067842d commit 0466d4b

File tree

5 files changed

+50
-21
lines changed

5 files changed

+50
-21
lines changed

elasticsearch-transport/elasticsearch-transport.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Gem::Specification.new do |s|
2929

3030
s.add_development_dependency "bundler", "> 1"
3131
s.add_development_dependency "rake"
32+
s.add_dependency "concurrent-ruby"
3233

3334
if defined?(RUBY_VERSION) && RUBY_VERSION > '1.9'
3435
s.add_development_dependency "elasticsearch-extensions"

elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ def __rebuild_connections(arguments={})
9999
@state_mutex.synchronize do
100100
@hosts = arguments[:hosts] || []
101101
@options = arguments[:options] || {}
102-
__close_connections
103-
@connections = __build_connections
102+
@connections.update(__build_connections)
104103
end
105104
end
106105

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/collection.rb

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
require 'concurrent'
2+
require 'thread'
3+
14
module Elasticsearch
25
module Transport
36
module Transport
@@ -22,9 +25,28 @@ class Collection
2225
#
2326
def initialize(arguments={})
2427
selector_class = arguments[:selector_class] || DEFAULT_SELECTOR
28+
@connections = Concurrent::Map.new
2529
@connections = arguments[:connections] || []
2630
@selector = arguments[:selector] || selector_class.new(arguments.merge(:connections => self))
31+
@state_mutex = Mutex.new
32+
end
33+
34+
# Adds new connections to the internal pool, and closing connections not in the list
35+
#
36+
# @option arguments [Array] :new_connections An Array of {Connection} objects
37+
def update(new_connections)
38+
new_connections.each {|connection| add_connection(connection)}
39+
40+
@connections.each do |connection,_|
41+
@connections.delete(connection) unless new_connections.include?(connection)
42+
connection.close
43+
end
44+
end
45+
46+
def add_connection(connection)
47+
@connections.put_if_absent(connection, true)
2748
end
49+
alias :<< :add_connection
2850

2951
# Returns an Array of hosts information in this collection as Hashes.
3052
#
@@ -39,7 +61,7 @@ def hosts
3961
# @return [Array]
4062
#
4163
def connections
42-
@connections.reject { |c| c.dead? }
64+
@connections.keys.reject { |c| c.dead? }
4365
end
4466
alias :alive :connections
4567

@@ -48,15 +70,15 @@ def connections
4870
# @return [Array]
4971
#
5072
def dead
51-
@connections.select { |c| c.dead? }
73+
@connections.keys.select { |c| c.dead? }
5274
end
5375

5476
# Returns an Array of all connections, both dead and alive
5577
#
5678
# @return [Array]
5779
#
5880
def all
59-
@connections
81+
@connections.keys
6082
end
6183

6284
# Returns a connection.
@@ -67,7 +89,7 @@ def all
6789
# @return [Connection]
6890
#
6991
def get_connection(options={})
70-
if connections.empty? && dead_connection = dead.sort { |a,b| a.failures <=> b.failures }.first
92+
if connections.empty? && (dead_connection = dead.sort { |a,b| a.failures <=> b.failures }.first)
7193
dead_connection.alive!
7294
end
7395
selector.select(options)
@@ -77,10 +99,9 @@ def each(&block)
7799
connections.each(&block)
78100
end
79101

80-
def slice(*args)
81-
connections.slice(*args)
102+
def [](key)
103+
@connections[key]
82104
end
83-
alias :[] :slice
84105

85106
def size
86107
connections.size

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/connection.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,19 @@ def resurrectable?
118118
}
119119
end
120120

121+
def close
122+
self.connection.close
123+
end
124+
125+
def eql?(other)
126+
other.instance_of?(self.class) && self.host.to_s == other.host.to_s
127+
end
128+
alias :== :eql?
129+
130+
def hash
131+
self.host.hash
132+
end
133+
121134
# @return [String]
122135
#
123136
def to_s

elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,20 +97,15 @@ def __build_connections
9797
@request_options[:headers] = options[:headers]
9898
end
9999

100-
Connections::Collection.new \
101-
:connections => hosts.map { |host|
102-
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
103-
host[:port] ||= DEFAULT_PORT
100+
hosts.map do |host|
101+
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
102+
host[:port] ||= DEFAULT_PORT
104103

105-
host.delete(:user) # auth is not supported here.
106-
host.delete(:password) # use the headers
104+
host.delete(:user) # auth is not supported here.
105+
host.delete(:password) # use the headers
107106

108-
Connections::Connection.new \
109-
:host => host,
110-
:connection => @manticore
111-
},
112-
:selector_class => options[:selector_class],
113-
:selector => options[:selector]
107+
Connections::Connection.new(:host => host, :connection => @manticore)
108+
end
114109
end
115110

116111
# Closes all connections by marking them as dead

0 commit comments

Comments
 (0)