-
Notifications
You must be signed in to change notification settings - Fork 255
Rewrite ConnectionPool for efficiency #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leehambley
merged 1 commit into
capistrano:master
from
mattbrictson:connection-pool-rewrite
Feb 23, 2016
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,126 +1,159 @@ | ||
| require "monitor" | ||
| require "thread" | ||
|
|
||
| # Since we call to_s on new_connection_args and use that as a hash | ||
| # We need to make sure the memory address of the object is not used as part of the key | ||
| # Otherwise identical objects with different memory address won't get a hash hit. | ||
| # In the case of proxy commands, this can lead to proxy processes leaking | ||
| # And in severe cases can cause deploys to fail due to default file descriptor limits | ||
| # An alternate solution would be to use a different means of generating hash keys | ||
| # Since we call to_s on new connection arguments and use that as a cache key, we | ||
| # need to make sure the memory address of the object is not used as part of the | ||
| # key. Otherwise identical objects with different memory address won't reuse the | ||
| # cache. | ||
| # | ||
| # In the case of proxy commands, this can lead to proxy processes leaking, and | ||
| # in severe cases can cause deploys to fail due to default file descriptor | ||
| # limits. An alternate solution would be to use a different means of generating | ||
| # hash keys. | ||
| # | ||
| require "net/ssh/proxy/command" | ||
| class Net::SSH::Proxy::Command | ||
| # Ensure a stable string value is used, rather than memory address. | ||
| def inspect | ||
| @command_line_template | ||
| end | ||
| end | ||
|
|
||
| module SSHKit | ||
|
|
||
| module Backend | ||
|
|
||
| class ConnectionPool | ||
|
|
||
| attr_accessor :idle_timeout, :enabled | ||
|
|
||
| def initialize | ||
| self.idle_timeout = 30 | ||
| self.enabled = true | ||
| @mutex = Mutex.new | ||
| @pool = {} | ||
| end | ||
| # The ConnectionPool caches connections and allows them to be reused, so long as | ||
| # the reuse happens within the `idle_timeout` period. Timed out connections are | ||
| # closed, forcing a new connection to be used in that case. | ||
| # | ||
| # Additionally, a background thread is started to check for abandoned | ||
| # connections that have timed out without any attempt at being reused. These | ||
| # are eventually closed as well and removed from the cache. | ||
| # | ||
| # If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and | ||
| # a new connection is created and then immediately closed each time. The default | ||
| # timeout is 30 (seconds). | ||
| # | ||
| # There is a single public method: `with`. Example usage: | ||
| # | ||
| # pool = SSHKit::Backend::ConnectionPool.new | ||
| # pool.with(Net::SSH.method(:start), "host", "username") do |connection| | ||
| # # do stuff with connection | ||
| # end | ||
| # | ||
| class SSHKit::Backend::ConnectionPool | ||
| attr_accessor :idle_timeout | ||
|
|
||
| def initialize(idle_timeout=30) | ||
| @idle_timeout = idle_timeout | ||
| @caches = {} | ||
| @caches.extend(MonitorMixin) | ||
| @timed_out_connections = Queue.new | ||
| Thread.new { run_eviction_loop } | ||
| end | ||
|
|
||
| def prune_expired? | ||
| idle_timeout && idle_timeout != 0 | ||
| end | ||
| # Creates a new connection or reuses a cached connection (if possible) and | ||
| # yields the connection to the given block. Connections are created by | ||
| # invoking the `connection_factory` proc with the given `args`. The arguments | ||
| # are used to construct a key used for caching. | ||
| def with(connection_factory, *args) | ||
| cache = find_cache(args) | ||
| conn = cache.pop || begin | ||
| connection_factory.call(*args) | ||
| end | ||
| yield(conn) | ||
| ensure | ||
| cache.push(conn) unless conn.nil? | ||
| end | ||
|
|
||
| def checkout(*new_connection_args, &block) | ||
| entry = nil | ||
| key = new_connection_args.to_s | ||
| if enabled | ||
| prune_expired if prune_expired? | ||
| entry = find_live_entry(key) | ||
| end | ||
| entry || create_new_entry(new_connection_args, key, &block) | ||
| end | ||
| # Immediately remove all cached connections, without closing them. This only | ||
| # exists for unit test purposes. | ||
| def flush_connections | ||
| caches.synchronize { caches.clear } | ||
| end | ||
|
|
||
| def checkin(entry) | ||
| if enabled | ||
| if prune_expired? | ||
| entry.expires_at = Time.now + idle_timeout | ||
| prune_expired | ||
| end | ||
| @mutex.synchronize do | ||
| @pool[entry.key] ||= [] | ||
| @pool[entry.key] << entry | ||
| end | ||
| end | ||
| end | ||
| # Immediately close all cached connections and empty the pool. | ||
| def close_connections | ||
| caches.synchronize do | ||
| caches.values.each(&:clear) | ||
| caches.clear | ||
| process_deferred_close | ||
| end | ||
| end | ||
|
|
||
| def close_connections | ||
| @mutex.synchronize do | ||
| @pool.values.flatten.map(&:connection).uniq.each do |conn| | ||
| if conn.respond_to?(:closed?) && conn.respond_to?(:close) | ||
| conn.close unless conn.closed? | ||
| end | ||
| end | ||
| @pool.clear | ||
| end | ||
| end | ||
| private | ||
|
|
||
| def flush_connections | ||
| @mutex.synchronize { @pool.clear } | ||
| end | ||
| attr_reader :caches, :timed_out_connections | ||
|
|
||
| private | ||
|
|
||
| def prune_expired | ||
| @mutex.synchronize do | ||
| @pool.each_value do |entries| | ||
| entries.collect! do |entry| | ||
| if entry.expired? | ||
| entry.close unless entry.closed? | ||
| nil | ||
| else | ||
| entry | ||
| end | ||
| end.compact! | ||
| end | ||
| end | ||
| end | ||
| def cache_enabled? | ||
| idle_timeout && idle_timeout > 0 | ||
| end | ||
|
|
||
| def find_live_entry(key) | ||
| @mutex.synchronize do | ||
| return nil unless @pool.key?(key) | ||
| while (entry = @pool[key].shift) | ||
| return entry if entry.live? | ||
| end | ||
| end | ||
| nil | ||
| end | ||
| # Look up a Cache that matches the given connection arguments. | ||
| def find_cache(args) | ||
| if cache_enabled? | ||
| key = args.to_s | ||
| caches[key] || thread_safe_find_or_create_cache(key) | ||
| else | ||
| NilCache.new(method(:silently_close_connection)) | ||
| end | ||
| end | ||
|
|
||
| def create_new_entry(args, key, &block) | ||
| Entry.new block.call(*args), key | ||
| # Cache creation needs to happen in a mutex, because otherwise a race | ||
| # condition might cause two identical caches to be created for the same key. | ||
| def thread_safe_find_or_create_cache(key) | ||
| caches.synchronize do | ||
| caches[key] ||= begin | ||
| Cache.new(idle_timeout, method(:silently_close_connection_later)) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| Entry = Struct.new(:connection, :key) do | ||
| attr_accessor :expires_at | ||
|
|
||
| def live? | ||
| !expired? && !closed? | ||
| end | ||
| # Loops indefinitely to close connections and to find abandoned connections | ||
| # that need to be closed. | ||
| def run_eviction_loop | ||
| loop do | ||
| process_deferred_close | ||
|
|
||
| def expired? | ||
| expires_at && Time.now > expires_at | ||
| end | ||
| # Periodically sweep all Caches to evict stale connections | ||
| sleep([idle_timeout, 5].min) | ||
| caches.values.each(&:evict) | ||
| end | ||
| end | ||
|
|
||
| def close | ||
| connection.respond_to?(:close) && connection.close | ||
| end | ||
| # Immediately close any connections that are pending closure. | ||
| # rubocop:disable Lint/HandleExceptions | ||
| def process_deferred_close | ||
| until timed_out_connections.empty? | ||
| connection = timed_out_connections.pop(true) | ||
| silently_close_connection(connection) | ||
| end | ||
| rescue ThreadError | ||
| # Queue#pop(true) raises ThreadError if the queue is empty. | ||
| # This could only happen if `close_connections` is called at the same time | ||
| # the background eviction thread has woken up to close connections. In any | ||
| # case, it is not something we need to care about, since an empty queue is | ||
| # perfectly OK. | ||
| end | ||
| # rubocop:enable Lint/HandleExceptions | ||
|
|
||
| def closed? | ||
| connection.respond_to?(:closed?) && connection.closed? | ||
| end | ||
| end | ||
| # Adds the connection to a queue that is processed asynchronously by a | ||
| # background thread. The connection will eventually be closed. | ||
| def silently_close_connection_later(connection) | ||
| timed_out_connections << connection | ||
| end | ||
|
|
||
| end | ||
| # Close the given `connection` immediately, assuming it responds to a `close` | ||
| # method. If it doesn't, or if `nil` is provided, it is silently ignored. Any | ||
| # `StandardError` is also silently ignored. Returns `true` if the connection | ||
| # was closed; `false` if it was already closed or could not be closed due to | ||
| # an error. | ||
| def silently_close_connection(connection) | ||
| return false unless connection.respond_to?(:close) | ||
| return false if connection.respond_to?(:closed?) && connection.closed? | ||
| connection.close | ||
| true | ||
| rescue StandardError | ||
| false | ||
| end | ||
| end | ||
|
|
||
| require "sshkit/backends/connection_pool/cache" | ||
| require "sshkit/backends/connection_pool/nil_cache" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| # A Cache holds connections for a given key. Each connection is stored along | ||
| # with an expiration time so that its idle duration can be measured. | ||
| class SSHKit::Backend::ConnectionPool::Cache | ||
| def initialize(idle_timeout, closer) | ||
| @connections = [] | ||
| @connections.extend(MonitorMixin) | ||
| @idle_timeout = idle_timeout | ||
| @closer = closer | ||
| end | ||
|
|
||
| # Remove and return a fresh connection from this Cache. Returns `nil` if | ||
| # the Cache is empty or if all existing connections have gone stale. | ||
| def pop | ||
| connections.synchronize do | ||
| evict | ||
| _, connection = connections.pop | ||
| connection | ||
| end | ||
| end | ||
|
|
||
| # Return a connection to this Cache. | ||
| def push(conn) | ||
| # No need to cache if the connection has already been closed. | ||
| return if closed?(conn) | ||
|
|
||
| connections.synchronize do | ||
| connections.push([Time.now + idle_timeout, conn]) | ||
| end | ||
| end | ||
|
|
||
| # Close and remove any connections in this Cache that have been idle for | ||
| # too long. | ||
| def evict | ||
| # Peek at the first connection to see if it is still fresh. If so, we can | ||
| # return right away without needing to use `synchronize`. | ||
| first_expires_at, _connection = connections.first | ||
| return if first_expires_at.nil? || fresh?(first_expires_at) | ||
|
|
||
| connections.synchronize do | ||
| fresh, stale = connections.partition do |expires_at, _| | ||
| fresh?(expires_at) | ||
| end | ||
| connections.replace(fresh) | ||
| stale.each { |_, conn| closer.call(conn) } | ||
| end | ||
| end | ||
|
|
||
| # Close all connections and completely clear the cache. | ||
| def clear | ||
| connections.synchronize do | ||
| connections.map(&:last).each(&closer) | ||
| connections.clear | ||
| end | ||
| end | ||
|
|
||
| private | ||
|
|
||
| attr_reader :connections, :idle_timeout, :closer | ||
|
|
||
| def fresh?(expires_at) | ||
| expires_at > Time.now | ||
| end | ||
|
|
||
| def closed?(conn) | ||
| conn.respond_to?(:closed?) && conn.closed? | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| # A cache that holds no connections. Any connection provided to this cache | ||
| # is simply closed. | ||
| SSHKit::Backend::ConnectionPool::NilCache = Struct.new(:closer) do | ||
| def pop | ||
| nil | ||
| end | ||
|
|
||
| def push(conn) | ||
| closer.call(conn) | ||
| end | ||
| end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're back to either disable the pooling entirely, or to have to prune connection?
I don't know if I'm alone, but in our case a full deploy take about 4 to 5 minutes and we do it from inside the datacenter, so pruning idle connections is really not a concern.
Any way we could disable it without disabling pooling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My goal was to improve efficiency across the board, rather than add conditionals as workarounds. The point is that If you just set
idle_timeoutto a high value, you shouldn't pay any performance penalty, because a mutex is no longer being used for pruning.But, like I said, the proof will be in the numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we'll see. I didn't have time to test yet.