Skip to content

Commit

Permalink
Fix out_of_band hook (puma#2218)
Browse files Browse the repository at this point in the history
* Fix out_of_band hook
Add server tests to ensure oob hooks fire as expected.

* Use mutex for out of band hook
Prevent overlapping requests from being processed during
out of band hook.
  • Loading branch information
wjordan authored Apr 16, 2020
1 parent 67f9b1f commit df9cdc7
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 9 deletions.
1 change: 1 addition & 0 deletions History.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)
* Fixed a few minor concurrency bugs in ThreadPool that may have affected non-GVL Rubies (#2220)
* Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
11 changes: 7 additions & 4 deletions lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,7 @@ def handle_servers
end

pool << client
busy_threads = pool.wait_until_not_full
if busy_threads == 0
@options[:out_of_band].each(&:call) if @options[:out_of_band]
end
pool.wait_until_not_full
end
rescue SystemCallError
# nothing
Expand Down Expand Up @@ -439,6 +436,12 @@ def process_client(client, buffer)
rescue StandardError => e
@events.unknown_error self, e, "Client"
end

if @options[:out_of_band]
@thread_pool.with_mutex do
@options[:out_of_band].each(&:call) if @thread_pool.busy_threads == 1
end
end
end
end

Expand Down
10 changes: 5 additions & 5 deletions lib/puma/thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def pool_capacity
waiting + (@max - spawned)
end

def busy_threads
with_mutex { @spawned - @waiting + @todo.size }
end

# :nodoc:
#
# Must be called with @mutex held!
Expand Down Expand Up @@ -188,9 +192,6 @@ def <<(work)
# method would not block and another request would be added into the reactor
# by the server. This would continue until a fully bufferend request
# makes it through the reactor and can then be processed by the thread pool.
#
# Returns the current number of busy threads, or +nil+ if shutting down.
#
def wait_until_not_full
with_mutex do
while true
Expand All @@ -200,8 +201,7 @@ def wait_until_not_full
# is work queued that cannot be handled by waiting
# threads, then accept more work until we would
# spin up the max number of threads.
busy_threads = @spawned - @waiting + @todo.size
return busy_threads if @max > busy_threads
return if busy_threads < @max

@not_full.wait @mutex
end
Expand Down
81 changes: 81 additions & 0 deletions test/test_puma_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -984,4 +984,85 @@ def test_http10_connection_header_no_queue
assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock)
sock.close
end

def oob_server(**options)
@request_count = 0
@oob_count = 0
@start = Time.now
in_oob = Mutex.new
@mutex = Mutex.new
@oob_finished = ConditionVariable.new
oob_wait = options.delete(:oob_wait)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
@oob_count += 1
@oob_finished.signal
@oob_finished.wait(@mutex, 1) if oob_wait
end
end
end
@server = Puma::Server.new @app, @events, out_of_band: [oob], **options
@server.min_threads = 5
@server.max_threads = 5
server_run app: ->(_) do
raise 'OOB conflict' if in_oob.locked?
@mutex.synchronize {@request_count += 1}
[200, {}, [""]]
end
end

# Sequential requests should trigger out_of_band hooks after every request.
def test_out_of_band
n = 100
oob_server queue_requests: false
n.times do
@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex, 1)
end
end
assert_equal n, @request_count
assert_equal n, @oob_count
end

# Streaming requests on parallel connections without delay should trigger
# out_of_band hooks only once after the final request.
def test_out_of_band_stream
n = 100
threads = 10
oob_server
req = "GET / HTTP/1.1\r\n"
@mutex.synchronize do
Array.new(threads) do
Thread.new do
send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n"
end
end.each(&:join)
@oob_finished.wait(@mutex, 1)
end
assert_equal n, @request_count
assert_equal 1, @oob_count
end

def test_out_of_band_overlapping_requests
oob_server oob_wait: true
sock = nil
sock2 = nil
@mutex.synchronize do
sock2 = send_http "GET / HTTP/1.0\r\n"
sleep 0.01
sock = send_http "GET / HTTP/1.0\r\n\r\n"
# Request 1 processed
@oob_finished.wait(@mutex) # enter oob
sock2 << "\r\n"
sleep 0.01
@oob_finished.signal # exit oob
# Request 2 processed
@oob_finished.wait(@mutex) # enter oob
@oob_finished.signal # exit oob
end
assert_match(/200/, sock.read)
assert_match(/200/, sock2.read)
end
end

0 comments on commit df9cdc7

Please sign in to comment.