Skip to content

Commit

Permalink
Ensure not to purge socket when using other place
Browse files Browse the repository at this point in the history
Change to default ref count is 1 in case that socket is expired but it
is used.
  • Loading branch information
ganmacs committed Apr 22, 2019
1 parent 75fc463 commit f892f42
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 29 deletions.
44 changes: 21 additions & 23 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def purge_obsolete_socks
def fetch_or(key = Thread.current.object_id)
@mutex.synchronize do
unless @active_socks[key]
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
@log.debug("connect new socket #{@active_socks[key]}")
return @active_socks[key].sock
end
Expand All @@ -634,18 +634,19 @@ def fetch_or(key = Thread.current.object_id)
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
end

@active_socks[key].ref += 1;
@active_socks[key].sock
end
end

def inc_ref(key = Thread.current.object_id)
def dec_ref(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@active_socks[key].ref += 1
@active_socks[key].ref -= 1
elsif @inactive_socks[key]
@inactive_socks[key].ref += 1
@inactive_socks[key].ref -= 1
else
@log.warn("Not found key for inc_ref: #{key}")
@log.warn("Not found key for dec_ref: #{key}")
end
end
end
Expand All @@ -666,7 +667,7 @@ def dec_ref_by_value(val)
@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref: #{key}")
@log.warn("Not found key for dec_ref_by_value: #{key}")
end
end
end
Expand Down Expand Up @@ -765,22 +766,12 @@ def standby?
end

def verify_connection
sock = connect

begin
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
end
rescue
if @keepalive
@socket_cache.revoke
end
ensure
unless @keepalive
sock.close
end
end
end

Expand Down Expand Up @@ -858,14 +849,12 @@ def send_data(tag, chunk)
end

if @sender.require_ack_response
if @keepalive
# to identify sock can't be closed
@socket_cache.inc_ref
end
return sock # to read ACK from socket
end

unless @keepalive
if @keepalive
@socket_cache.dec_ref
else
sock.close_write rescue nil
sock.close rescue nil
end
Expand Down Expand Up @@ -1086,7 +1075,16 @@ def connect(host = nil)
end

if block_given?
yield(sock)
begin
yield(sock)
rescue
@socket_cache.revoke(sock) if @keepalive
raise
else
@socket_cache.dec_ref(sock) if @keepalive
ensure
sock.close unless @keepalive
end
else
sock
end
Expand Down
12 changes: 6 additions & 6 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1044,21 +1044,21 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
assert_equal(1, c.fetch_or { sock.open })
end

sub_test_case 'inc_ref' do
sub_test_case 'dec_ref' do
test 'when value exists in active_socks' do
c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil))
c.fetch_or { 1 }
c.inc_ref
c.dec_ref

assert_equal(1, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref)
assert_equal(0, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref)
end

test 'when value exists in inactive_socks' do
c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil))
c.fetch_or { 1 }
c.revoke
c.inc_ref
assert_equal(1, c.instance_variable_get(:@inactive_socks)[Thread.current.object_id].ref)
c.dec_ref
assert_equal(-1, c.instance_variable_get(:@inactive_socks)[Thread.current.object_id].ref)
end
end

Expand All @@ -1068,7 +1068,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
c.fetch_or { 1 }
c.dec_ref_by_value(1)

assert_equal(-1, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref)
assert_equal(0, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref)
end

test 'when value exists in inactive_socks' do
Expand Down

0 comments on commit f892f42

Please sign in to comment.