Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ This file is written in reverse chronological order, newer releases will
appear at the top.

## `master` (Unreleased)
* Add your entries below here, remember to credit yourself however you want
to be credited!
* Do not auto-include DSL in context of `require`. Load DSL with Gem and allow to include it.
[PR #219](https://github.com/capistrano/sshkit/pull/219)
@beatrichartz
* `SSHKit::Backend::Netssh.pool.idle_timeout = 0` doesn't disable connection pooling anymore,
only connection expiration. To disable connection polling use `SSHKit::Backend::Netssh.pool.enabled = false`
* Add your entries below here, remember to credit yourself however you want
to be credited!
* make sure working directory for commands is properly cleared after `within` blocks
[PR #307](https://github.com/capistrano/sshkit/pull/307)
@steved
Expand All @@ -30,6 +28,10 @@ appear at the top.
* Fix a race condition experienced in JRuby that could cause multi-server
deploys to fail. [PR #322](https://github.com/capistrano/sshkit/pull/322)
@mattbrictson
* The ConnectionPool has been rewritten in this release to be more efficient
and have a cleaner internal API. You can still completely disable the pool
by setting `SSHKit::Backend::Netssh.pool.idle_timeout = 0`.
@mattbrictson @byroot [PR #328](https://github.com/capistrano/sshkit/pull/328)

## 1.8.1

Expand Down
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,18 +500,13 @@ seconds. This timeout can be changed as follows:
SSHKit::Backend::Netssh.pool.idle_timeout = 60 # seconds
```

If you suspect the connection pruning is causing problems, you can disable the
expiration behaviour entirely by setting the idle_timeout to zero:
If you suspect the connection pooling is causing problems, you can disable the
pooling behaviour entirely by setting the idle_timeout to zero:

```ruby
SSHKit::Backend::Netssh.pool.idle_timeout = 0 # disabled
```

Or even disable the pooling entirely, but it will severely reduce performance:
```
SSHKit::Backend::Netssh.pool.enabled = false # disabled
```

## Tunneling and other related SSH themes

In order to do special gymnasitcs with SSH, tunneling, aliasing, complex options, etc with SSHKit it is possible to use [the underlying Net::SSH API](https://github.com/capistrano/sshkit/blob/master/EXAMPLES.md#setting-global-ssh-options) however in many cases it is preferred to use the system SSH configuration file at [`~/.ssh/config`](http://man.cx/ssh_config). This allows you to have personal configuration tied to your machine that does not have to be committed with the repository. If this is not suitable (everyone on the team needs a proxy command, or some special aliasing) a file in the same format can be placed in the project directory at `~/yourproject/.ssh/config`, this will be merged with the system settings in `~/.ssh/config`, and with any configuration specified in [`SSHKit::Backend::Netssh.config.ssh_options`](https://github.com/capistrano/sshkit/blob/master/lib/sshkit/backends/netssh.rb#L133).
Expand Down
235 changes: 134 additions & 101 deletions lib/sshkit/backends/connection_pool.rb
Original file line number Diff line number Diff line change
@@ -1,126 +1,159 @@
require "monitor"
require "thread"

# Since we call to_s on new_connection_args and use that as a hash
# We need to make sure the memory address of the object is not used as part of the key
# Otherwise identical objects with different memory address won't get a hash hit.
# In the case of proxy commands, this can lead to proxy processes leaking
# And in severe cases can cause deploys to fail due to default file descriptor limits
# An alternate solution would be to use a different means of generating hash keys
# Since we call to_s on new connection arguments and use that as a cache key, we
# need to make sure the memory address of the object is not used as part of the
# key. Otherwise identical objects with different memory address won't reuse the
# cache.
#
# In the case of proxy commands, this can lead to proxy processes leaking, and
# in severe cases can cause deploys to fail due to default file descriptor
# limits. An alternate solution would be to use a different means of generating
# hash keys.
#
require "net/ssh/proxy/command"
class Net::SSH::Proxy::Command
# Ensure a stable string value is used, rather than memory address.
def inspect
@command_line_template
end
end

module SSHKit

module Backend

class ConnectionPool

attr_accessor :idle_timeout, :enabled

def initialize
self.idle_timeout = 30
self.enabled = true
@mutex = Mutex.new
@pool = {}
end
# The ConnectionPool caches connections and allows them to be reused, so long as
# the reuse happens within the `idle_timeout` period. Timed out connections are
# closed, forcing a new connection to be used in that case.
#
# Additionally, a background thread is started to check for abandoned
# connections that have timed out without any attempt at being reused. These
# are eventually closed as well and removed from the cache.
#
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and
# a new connection is created and then immediately closed each time. The default
# timeout is 30 (seconds).
#
# There is a single public method: `with`. Example usage:
#
# pool = SSHKit::Backend::ConnectionPool.new
# pool.with(Net::SSH.method(:start), "host", "username") do |connection|
# # do stuff with connection
# end
#
class SSHKit::Backend::ConnectionPool
attr_accessor :idle_timeout

def initialize(idle_timeout=30)
@idle_timeout = idle_timeout
@caches = {}
@caches.extend(MonitorMixin)
@timed_out_connections = Queue.new
Thread.new { run_eviction_loop }
end

def prune_expired?
idle_timeout && idle_timeout != 0
end
# Creates a new connection or reuses a cached connection (if possible) and
# yields the connection to the given block. Connections are created by
# invoking the `connection_factory` proc with the given `args`. The arguments
# are used to construct a key used for caching.
def with(connection_factory, *args)
cache = find_cache(args)
conn = cache.pop || begin
connection_factory.call(*args)
end
yield(conn)
ensure
cache.push(conn) unless conn.nil?
end

def checkout(*new_connection_args, &block)
entry = nil
key = new_connection_args.to_s
if enabled
prune_expired if prune_expired?
entry = find_live_entry(key)
end
entry || create_new_entry(new_connection_args, key, &block)
end
# Immediately remove all cached connections, without closing them. This only
# exists for unit test purposes.
def flush_connections
caches.synchronize { caches.clear }
end

def checkin(entry)
if enabled
if prune_expired?
entry.expires_at = Time.now + idle_timeout
prune_expired
end
@mutex.synchronize do
@pool[entry.key] ||= []
@pool[entry.key] << entry
end
end
end
# Immediately close all cached connections and empty the pool.
def close_connections
caches.synchronize do
caches.values.each(&:clear)
caches.clear
process_deferred_close
end
end

def close_connections
@mutex.synchronize do
@pool.values.flatten.map(&:connection).uniq.each do |conn|
if conn.respond_to?(:closed?) && conn.respond_to?(:close)
conn.close unless conn.closed?
end
end
@pool.clear
end
end
private

def flush_connections
@mutex.synchronize { @pool.clear }
end
attr_reader :caches, :timed_out_connections

private

def prune_expired
@mutex.synchronize do
@pool.each_value do |entries|
entries.collect! do |entry|
if entry.expired?
entry.close unless entry.closed?
nil
else
entry
end
end.compact!
end
end
end
def cache_enabled?
idle_timeout && idle_timeout > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're back to either disable the pooling entirely, or to have to prune connection?

I don't know if I'm alone, but in our case a full deploy take about 4 to 5 minutes and we do it from inside the datacenter, so pruning idle connections is really not a concern.

Any way we could disable it without disabling pooling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal was to improve efficiency across the board, rather than add conditionals as workarounds. The point is that If you just set idle_timeout to a high value, you shouldn't pay any performance penalty, because a mutex is no longer being used for pruning.

But, like I said, the proof will be in the numbers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we'll see. I didn't have time to test yet.

end

def find_live_entry(key)
@mutex.synchronize do
return nil unless @pool.key?(key)
while (entry = @pool[key].shift)
return entry if entry.live?
end
end
nil
end
# Look up a Cache that matches the given connection arguments.
def find_cache(args)
if cache_enabled?
key = args.to_s
caches[key] || thread_safe_find_or_create_cache(key)
else
NilCache.new(method(:silently_close_connection))
end
end

def create_new_entry(args, key, &block)
Entry.new block.call(*args), key
# Cache creation needs to happen in a mutex, because otherwise a race
# condition might cause two identical caches to be created for the same key.
def thread_safe_find_or_create_cache(key)
caches.synchronize do
caches[key] ||= begin
Cache.new(idle_timeout, method(:silently_close_connection_later))
end
end
end

Entry = Struct.new(:connection, :key) do
attr_accessor :expires_at

def live?
!expired? && !closed?
end
# Loops indefinitely to close connections and to find abandoned connections
# that need to be closed.
def run_eviction_loop
loop do
process_deferred_close

def expired?
expires_at && Time.now > expires_at
end
# Periodically sweep all Caches to evict stale connections
sleep([idle_timeout, 5].min)
caches.values.each(&:evict)
end
end

def close
connection.respond_to?(:close) && connection.close
end
# Immediately close any connections that are pending closure.
# rubocop:disable Lint/HandleExceptions
def process_deferred_close
until timed_out_connections.empty?
connection = timed_out_connections.pop(true)
silently_close_connection(connection)
end
rescue ThreadError
# Queue#pop(true) raises ThreadError if the queue is empty.
# This could only happen if `close_connections` is called at the same time
# the background eviction thread has woken up to close connections. In any
# case, it is not something we need to care about, since an empty queue is
# perfectly OK.
end
# rubocop:enable Lint/HandleExceptions

def closed?
connection.respond_to?(:closed?) && connection.closed?
end
end
# Adds the connection to a queue that is processed asynchronously by a
# background thread. The connection will eventually be closed.
def silently_close_connection_later(connection)
timed_out_connections << connection
end

end
# Close the given `connection` immediately, assuming it responds to a `close`
# method. If it doesn't, or if `nil` is provided, it is silently ignored. Any
# `StandardError` is also silently ignored. Returns `true` if the connection
# was closed; `false` if it was already closed or could not be closed due to
# an error.
def silently_close_connection(connection)
return false unless connection.respond_to?(:close)
return false if connection.respond_to?(:closed?) && connection.closed?
connection.close
true
rescue StandardError
false
end
end

require "sshkit/backends/connection_pool/cache"
require "sshkit/backends/connection_pool/nil_cache"
67 changes: 67 additions & 0 deletions lib/sshkit/backends/connection_pool/cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# A Cache holds connections for a given key. Each connection is stored along
# with an expiration time so that its idle duration can be measured.
class SSHKit::Backend::ConnectionPool::Cache
def initialize(idle_timeout, closer)
@connections = []
@connections.extend(MonitorMixin)
@idle_timeout = idle_timeout
@closer = closer
end

# Remove and return a fresh connection from this Cache. Returns `nil` if
# the Cache is empty or if all existing connections have gone stale.
def pop
connections.synchronize do
evict
_, connection = connections.pop
connection
end
end

# Return a connection to this Cache.
def push(conn)
# No need to cache if the connection has already been closed.
return if closed?(conn)

connections.synchronize do
connections.push([Time.now + idle_timeout, conn])
end
end

# Close and remove any connections in this Cache that have been idle for
# too long.
def evict
# Peek at the first connection to see if it is still fresh. If so, we can
# return right away without needing to use `synchronize`.
first_expires_at, _connection = connections.first
return if first_expires_at.nil? || fresh?(first_expires_at)

connections.synchronize do
fresh, stale = connections.partition do |expires_at, _|
fresh?(expires_at)
end
connections.replace(fresh)
stale.each { |_, conn| closer.call(conn) }
end
end

# Close all connections and completely clear the cache.
def clear
connections.synchronize do
connections.map(&:last).each(&closer)
connections.clear
end
end

private

attr_reader :connections, :idle_timeout, :closer

def fresh?(expires_at)
expires_at > Time.now
end

def closed?(conn)
conn.respond_to?(:closed?) && conn.closed?
end
end
11 changes: 11 additions & 0 deletions lib/sshkit/backends/connection_pool/nil_cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# A cache that holds no connections. Any connection provided to this cache
# is simply closed.
SSHKit::Backend::ConnectionPool::NilCache = Struct.new(:closer) do
def pop
nil
end

def push(conn)
closer.call(conn)
end
end
Loading