diff --git a/CHANGELOG.md b/CHANGELOG.md index 340c0bf6..50d2b87a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,14 @@ Dalli Changelog Unreleased ========== +- Better handle memcached requests being interrupted by Thread#raise or Thread#kill (byroot) +- Unexpected errors are no longer treated as `Dalli::NetworkError`, including errors raised by `Timeout.timeout` (byroot) + 3.2.4 ========== -- Cache PID calls for performance since glibc no longer caches in recent versions (casperisfine) -- Preallocate the read buffer in Socket#readfull (casperisfine) +- Cache PID calls for performance since glibc no longer caches in recent versions (byroot) +- Preallocate the read buffer in Socket#readfull (byroot) 3.2.3 ========== @@ -51,7 +54,7 @@ Unreleased 3.1.4 ========== -- Improve response parsing performance (casperisfine) +- Improve response parsing performance (byroot) - Reorganize binary protocol parsing a bit (petergoldstein) - Fix handling of non-ASCII keys in get_multi (petergoldstein) diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index 89c4fd25..f5cffb6c 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -32,7 +32,13 @@ def request(opkey, *args) verify_state(opkey) begin - send(opkey, *args) + @connection_manager.start_request! + response = send(opkey, *args) + + # pipelined_get emit query but doesn't read the response(s) + @connection_manager.finish_request! unless opkey == :pipelined_get + + response rescue Dalli::MarshalError => e log_marshal_err(args.first, e) raise @@ -40,7 +46,8 @@ def request(opkey, *args) raise rescue StandardError => e log_unexpected_err(e) - down! + close + raise end end @@ -65,10 +72,9 @@ def unlock!; end # # Returns nothing. def pipeline_response_setup - verify_state(:getkq) + verify_pipelined_state(:getkq) write_noop response_buffer.reset - @connection_manager.start_request! end # Attempt to receive and parse as many key/value pairs as possible @@ -169,6 +175,11 @@ def verify_state(opkey) raise_down_error unless ensure_connected! end + def verify_pipelined_state(_opkey) + @connection_manager.confirm_in_progress! + raise_down_error unless connected? + end + # The socket connection to the underlying server is initialized as a side # effect of this call. In fact, this is the ONLY place where that # socket connection is initialized. diff --git a/lib/dalli/protocol/connection_manager.rb b/lib/dalli/protocol/connection_manager.rb index 5a0b6ff0..0f5a01b6 100644 --- a/lib/dalli/protocol/connection_manager.rb +++ b/lib/dalli/protocol/connection_manager.rb @@ -54,6 +54,7 @@ def establish_connection @sock = memcached_socket @pid = PIDCache.pid + @request_in_progress = false rescue SystemCallError, Timeout::Error, EOFError, SocketError => e # SocketError = DNS resolution failure error_on_request!(e) @@ -98,7 +99,13 @@ def socket_timeout end def confirm_ready! - error_on_request!(RuntimeError.new('Already writing to socket')) if request_in_progress? + close if request_in_progress? + close_on_fork if fork_detected? + end + + def confirm_in_progress! + raise '[Dalli] No request in progress. This may be a bug in Dalli.' unless request_in_progress? + close_on_fork if fork_detected? end @@ -124,10 +131,14 @@ def request_in_progress? end def start_request! + raise '[Dalli] Request already in progress. This may be a bug in Dalli.' if @request_in_progress + @request_in_progress = true end def finish_request! + raise '[Dalli] No request in progress. This may be a bug in Dalli.' unless @request_in_progress + @request_in_progress = false end @@ -136,36 +147,26 @@ def abort_request! end def read_line - start_request! data = @sock.gets("\r\n") error_on_request!('EOF in read_line') if data.nil? - finish_request! data rescue SystemCallError, Timeout::Error, EOFError => e error_on_request!(e) end def read(count) - start_request! - data = @sock.readfull(count) - finish_request! - data + @sock.readfull(count) rescue SystemCallError, Timeout::Error, EOFError => e error_on_request!(e) end def write(bytes) - start_request! - result = @sock.write(bytes) - finish_request! - result + @sock.write(bytes) rescue SystemCallError, Timeout::Error => e error_on_request!(e) end - # Non-blocking read. Should only be used in the context - # of a caller who has called start_request!, but not yet - # called finish_request!. Here to support the operation + # Non-blocking read. Here to support the operation # of the get_multi operation def read_nonblock @sock.read_available diff --git a/test/integration/test_network.rb b/test/integration/test_network.rb index e5461d36..b2332244 100644 --- a/test/integration/test_network.rb +++ b/test/integration/test_network.rb @@ -113,6 +113,112 @@ end end + it 'handles asynchronous Thread#raise' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + thread = Thread.new do + loop do + assert_instance_of Integer, dc.set("key:#{i}", i.to_s) + end + rescue RuntimeError + nil # expected + end + thread.join(rand(0.01..0.2)) + + thread.raise('Test Timeout Error') + joined_thread = thread.join(1) + + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal i.to_s, dc.get("key:#{i}") + end + end + end + end + + it 'handles asynchronous Thread#raise during pipelined get' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] } + expected_response.each do |key, val| + dc.set(key, val) + end + + thread = Thread.new do + loop do + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + rescue RuntimeError + nil # expected + end + thread.join(rand(0.01..0.2)) + + thread.raise('Test Timeout Error') + joined_thread = thread.join(1) + + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + end + end + end + + it 'handles asynchronous Thread#kill' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + thread = Thread.new do + loop do + assert_instance_of Integer, dc.set("key:#{i}", i.to_s) + end + rescue RuntimeError + nil # expected + end + thread.join(rand(0.01..0.2)) + + thread.kill + joined_thread = thread.join(1) + + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal i.to_s, dc.get("key:#{i}") + end + end + end + end + + it 'handles asynchronous Thread#kill during pipelined get' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] } + expected_response.each do |key, val| + dc.set(key, val) + end + + thread = Thread.new do + loop do + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + rescue RuntimeError + nil # expected + end + thread.join(rand(0.01..0.2)) + + thread.kill + joined_thread = thread.join(1) + + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + end + end + end + it 'passes a simple smoke test on a TCP socket' do memcached_persistent(p) do |dc, port| resp = dc.flush diff --git a/test/integration/test_pipelined_get.rb b/test/integration/test_pipelined_get.rb index 8578f848..2f3f0015 100644 --- a/test/integration/test_pipelined_get.rb +++ b/test/integration/test_pipelined_get.rb @@ -82,7 +82,7 @@ describe 'pipeline_next_responses' do it 'raises NetworkError when called before pipeline_response_setup' do memcached_persistent(p) do |dc| - server = dc.instance_variable_get(:@ring).servers.first + server = dc.send(:ring).servers.first server.request(:pipelined_get, %w[a b]) assert_raises Dalli::NetworkError do server.pipeline_next_responses @@ -92,7 +92,7 @@ it 'raises NetworkError when called after pipeline_abort' do memcached_persistent(p) do |dc| - server = dc.instance_variable_get(:@ring).servers.first + server = dc.send(:ring).servers.first server.request(:pipelined_get, %w[a b]) server.pipeline_response_setup server.pipeline_abort