Skip to content

Commit 75fc463

Browse files
committed
Introduce SocketCache for keepalive
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
1 parent 6dfe1f1 commit 75fc463

File tree

2 files changed

+427
-15
lines changed

2 files changed

+427
-15
lines changed

lib/fluent/plugin/out_forward.rb

Lines changed: 218 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ class ConnectionClosedError < Error; end
3636
desc 'The transport protocol.'
3737
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
3838
# TODO: TLS session cache/tickets
39-
# TODO: Connection keepalive
4039

4140
desc 'The timeout time when sending event logs.'
4241
config_param :send_timeout, :time, default: 60
@@ -155,6 +154,7 @@ def initialize
155154
@usock = nil
156155
@sock_ack_waiting = nil
157156
@sock_ack_waiting_mutex = nil
157+
@keep_alive_watcher_interval = 5 # TODO
158158
end
159159

160160
def configure(conf)
@@ -205,9 +205,9 @@ def configure(conf)
205205

206206
log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
207207
if @heartbeat_type == :none
208-
@nodes << NoneHeartbeatNode.new(self, server, failure: failure)
208+
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: keepalive_timeout)
209209
else
210-
node = Node.new(self, server, failure: failure)
210+
node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: keepalive_timeout)
211211
begin
212212
node.validate_host_resolution!
213213
rescue => e
@@ -231,8 +231,8 @@ def configure(conf)
231231
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
232232
end
233233

234-
unless @keepalive
235-
log.warn "The value of keepalive_timeout is ignored. if you want to use keepalive, please add to your conf `keepalive: true`."
234+
if !@keepalive && @keepalive_timeout
235+
log.warn 'The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive: true` to your conf.'
236236
end
237237

238238
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
@@ -287,13 +287,21 @@ def start
287287
end
288288
end
289289
end
290+
291+
if @keepalive && @keepalive_timeout
292+
timer_execute(:keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
293+
end
290294
end
291295

292296
def close
293297
if @usock
294298
# close socket and ignore errors: this socket will not be used anyway.
295299
@usock.close rescue nil
296300
end
301+
302+
if @keepalive && @keepalive_timeout
303+
@nodes.each(&:clear)
304+
end
297305
super
298306
end
299307

@@ -458,6 +466,10 @@ def on_heartbeat(sockaddr, msg)
458466
end
459467
end
460468

469+
def on_purge_obsolete_socks
470+
@nodes.each(&:purge_obsolete_socks)
471+
end
472+
461473
# return chunk id to be committed
462474
def read_ack_from_sock(sock, unpacker)
463475
begin
@@ -492,8 +504,13 @@ def read_ack_from_sock(sock, unpacker)
492504
log.error "unexpected error while receiving ack message", error: e
493505
log.error_backtrace
494506
ensure
495-
info.sock.close_write rescue nil
496-
info.sock.close rescue nil
507+
if @keepalive
508+
info.node.socket_cache.dec_ref_by_value(info.sock)
509+
else
510+
info.sock.close_write rescue nil
511+
info.sock.close rescue nil
512+
end
513+
497514
@sock_ack_waiting_mutex.synchronize do
498515
@sock_ack_waiting.delete(info)
499516
end
@@ -521,6 +538,9 @@ def ack_reader
521538
# (2) the node does support sending response but responses have not arrived for some reasons.
522539
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
523540
info.node.disable!
541+
if @keepalive
542+
info.node.socket_cache.revoke_by_value(info.sock)
543+
end
524544
info.sock.close rescue nil
525545
rollback_write(info.chunk_id, update_retry: false)
526546
else
@@ -546,7 +566,140 @@ def ack_reader
546566
end
547567

548568
class Node
549-
def initialize(sender, server, failure:)
569+
class SocketCache
570+
TimedSocket = Struct.new(:timeout, :sock, :ref)
571+
572+
def initialize(timeout, log)
573+
@log = log
574+
@timeout = timeout
575+
@active_socks = {}
576+
@inactive_socks = {}
577+
@mutex = Mutex.new
578+
end
579+
580+
def revoke(key = Thread.current.object_id)
581+
@mutex.synchronize do
582+
if @active_socks[key]
583+
@inactive_socks[key] = @active_socks.delete(key)
584+
@inactive_socks[key].ref = 0
585+
end
586+
end
587+
end
588+
589+
def clear
590+
@mutex.synchronize do
591+
@inactive_socks.values.each do |s|
592+
s.sock.close
593+
end
594+
@inactive_socks.clear
595+
596+
@active_socks.values.each do |s|
597+
s.sock.close
598+
end
599+
@active_socks.clear
600+
end
601+
end
602+
603+
def purge_obsolete_socks
604+
@mutex.synchronize do
605+
@inactive_socks.keys.each do |k|
606+
# 0 means sockets stored in this class received all acks
607+
if @inactive_socks[k].ref <= 0
608+
s = @inactive_socks.delete(k)
609+
s.sock.close
610+
@log.debug("purged obsolete socket #{s.sock}")
611+
end
612+
end
613+
614+
@active_socks.keys.each do |k|
615+
if expired?(k) && @active_socks[k].ref <= 0
616+
@inactive_socks[k] = @active_socks.delete(k)
617+
end
618+
end
619+
end
620+
end
621+
622+
def fetch_or(key = Thread.current.object_id)
623+
@mutex.synchronize do
624+
unless @active_socks[key]
625+
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
626+
@log.debug("connect new socket #{@active_socks[key]}")
627+
return @active_socks[key].sock
628+
end
629+
630+
if expired?(key)
631+
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
632+
@inactive_socks[key] = @active_socks.delete(key)
633+
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
634+
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
635+
end
636+
637+
@active_socks[key].sock
638+
end
639+
end
640+
641+
def inc_ref(key = Thread.current.object_id)
642+
@mutex.synchronize do
643+
if @active_socks[key]
644+
@active_socks[key].ref += 1
645+
elsif @inactive_socks[key]
646+
@inactive_socks[key].ref += 1
647+
else
648+
@log.warn("Not found key for inc_ref: #{key}")
649+
end
650+
end
651+
end
652+
653+
# This method is expected to be called in class which doesn't call #inc_ref
654+
def dec_ref_by_value(val)
655+
@mutex.synchronize do
656+
sock = @active_socks.detect { |_, v| v.sock == val }
657+
if sock
658+
key = sock.first
659+
@active_socks[key].ref -= 1
660+
return
661+
end
662+
663+
sock = @inactive_socks.detect { |_, v| v.sock == val }
664+
if sock
665+
key = sock.first
666+
@inactive_socks[key].ref -= 1
667+
return
668+
else
669+
@log.warn("Not found key for dec_ref: #{key}")
670+
end
671+
end
672+
end
673+
674+
# This method is expected to be called in class which doesn't call #fetch_or
675+
def revoke_by_value(val)
676+
@mutex.synchronize do
677+
sock = @active_socks.detect { |_, v| v.sock == val }
678+
if sock
679+
key = sock.first
680+
@inactive_socks[key] = @active_socks.delete(key)
681+
@inactive_socks[key].ref = 0
682+
else
683+
@log.debug("Not found for revoke_by_value :#{val}")
684+
end
685+
end
686+
end
687+
688+
private
689+
690+
def timeout
691+
@timeout && Time.now + @timeout
692+
end
693+
694+
# This method is thread unsafe
695+
def expired?(key = Thread.current.object_id)
696+
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
697+
end
698+
end
699+
700+
# @param keepalive [Bool]
701+
# @param keepalive_timeout [Integer | nil]
702+
def initialize(sender, server, failure:, keepalive: false, keepalive_timeout: nil)
550703
@sender = sender
551704
@log = sender.log
552705
@compress = sender.compress
@@ -579,13 +732,19 @@ def initialize(sender, server, failure:)
579732
@resolved_host = nil
580733
@resolved_time = 0
581734
@resolved_once = false
735+
736+
@keepalive = keepalive
737+
if @keepalive
738+
@socket_cache = SocketCache.new(keepalive_timeout, @log)
739+
end
582740
end
583741

584742
attr_accessor :usock
585743

586744
attr_reader :name, :host, :port, :weight, :standby, :state
587745
attr_reader :sockaddr # used by on_heartbeat
588746
attr_reader :failure, :available # for test
747+
attr_reader :socket_cache # for ack
589748

590749
RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)
591750

@@ -606,15 +765,22 @@ def standby?
606765
end
607766

608767
def verify_connection
609-
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
768+
sock = connect
769+
610770
begin
611771
ri = RequestInfo.new(@sender.security ? :helo : :established)
612772
if ri.state != :established
613773
establish_connection(sock, ri)
614774
raise if ri.state != :established
615775
end
776+
rescue
777+
if @keepalive
778+
@socket_cache.revoke
779+
end
616780
ensure
617-
sock.close
781+
unless @keepalive
782+
sock.close
783+
end
618784
end
619785
end
620786

@@ -680,24 +846,44 @@ def send_data_actual(sock, tag, chunk)
680846
end
681847

682848
def send_data(tag, chunk)
683-
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
849+
sock = connect
850+
684851
begin
685852
send_data_actual(sock, tag, chunk)
686853
rescue
687-
sock.close rescue nil
854+
if @keepalive
855+
@socket_cache.revoke
856+
end
688857
raise
689858
end
690859

691860
if @sender.require_ack_response
861+
if @keepalive
862+
# to identify sock can't be closed
863+
@socket_cache.inc_ref
864+
end
692865
return sock # to read ACK from socket
693866
end
694867

695-
sock.close_write rescue nil
696-
sock.close rescue nil
868+
unless @keepalive
869+
sock.close_write rescue nil
870+
sock.close rescue nil
871+
end
697872
heartbeat(false)
698873
nil
699874
end
700875

876+
def clear
877+
@keepalive && @socket_cache.clear
878+
end
879+
880+
def purge_obsolete_socks
881+
unless @keepalive
882+
raise "Don not call this method without keepalive option"
883+
end
884+
@socket_cache.purge_obsolete_socks
885+
end
886+
701887
# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
702888
def send_heartbeat
703889
begin
@@ -713,7 +899,7 @@ def send_heartbeat
713899

714900
case @sender.heartbeat_type
715901
when :transport
716-
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
902+
connect(dest_addr) do |sock|
717903
## don't send any data to not cause a compatibility problem
718904
# sock.write FORWARD_TCP_HEARTBEAT_DATA
719905

@@ -888,6 +1074,23 @@ def on_read(sock, ri, data)
8881074
raise "BUG: unknown session state: #{ri.state}"
8891075
end
8901076
end
1077+
1078+
private
1079+
1080+
def connect(host = nil)
1081+
sock = if @keepalive
1082+
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
1083+
else
1084+
@log.debug('connect new socket')
1085+
@sender.create_transfer_socket(resolved_host, port, @hostname)
1086+
end
1087+
1088+
if block_given?
1089+
yield(sock)
1090+
else
1091+
sock
1092+
end
1093+
end
8911094
end
8921095

8931096
# Override Node to disable heartbeat

0 commit comments

Comments
 (0)