Skip to content

Commit 272bc85

Browse files
authored
Merge pull request #2393 from ganmacs/keepalive-for-out_forward
Keepalive for out_forward plugin
2 parents add2af9 + d3cb120 commit 272bc85

File tree

2 files changed

+436
-15
lines changed

2 files changed

+436
-15
lines changed

lib/fluent/plugin/out_forward.rb

Lines changed: 227 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ class ConnectionClosedError < Error; end
3636
desc 'The transport protocol.'
3737
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
3838
# TODO: TLS session cache/tickets
39-
# TODO: Connection keepalive
4039

4140
desc 'The timeout time when sending event logs.'
4241
config_param :send_timeout, :time, default: 60
@@ -103,6 +102,10 @@ class ConnectionClosedError < Error; end
103102
config_param :tls_client_private_key_path, :string, default: nil
104103
desc 'The client private key passphrase for TLS.'
105104
config_param :tls_client_private_key_passphrase, :string, default: nil, secret: true
105+
desc "Enable keepalive connection."
106+
config_param :keepalive, :bool, default: false
107+
desc "Expired time of keepalive. Default value is nil, which means to keep connection as long as possible"
108+
config_param :keepalive_timeout, :time, default: nil
106109

107110
config_section :security, required: false, multi: false do
108111
desc 'The hostname'
@@ -151,6 +154,7 @@ def initialize
151154
@usock = nil
152155
@sock_ack_waiting = nil
153156
@sock_ack_waiting_mutex = nil
157+
@keep_alive_watcher_interval = 5 # TODO
154158
end
155159

156160
def configure(conf)
@@ -201,9 +205,9 @@ def configure(conf)
201205

202206
log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
203207
if @heartbeat_type == :none
204-
@nodes << NoneHeartbeatNode.new(self, server, failure: failure)
208+
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
205209
else
206-
node = Node.new(self, server, failure: failure)
210+
node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
207211
begin
208212
node.validate_host_resolution!
209213
rescue => e
@@ -227,6 +231,10 @@ def configure(conf)
227231
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
228232
end
229233

234+
if !@keepalive && @keepalive_timeout
235+
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
236+
end
237+
230238
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
231239
end
232240

@@ -279,13 +287,21 @@ def start
279287
end
280288
end
281289
end
290+
291+
if @keepalive && @keepalive_timeout
292+
timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
293+
end
282294
end
283295

284296
def close
285297
if @usock
286298
# close socket and ignore errors: this socket will not be used anyway.
287299
@usock.close rescue nil
288300
end
301+
302+
if @keepalive && @keepalive_timeout
303+
@nodes.each(&:clear)
304+
end
289305
super
290306
end
291307

@@ -453,6 +469,10 @@ def on_heartbeat(sockaddr, msg)
453469
end
454470
end
455471

472+
def on_purge_obsolete_socks
473+
@nodes.each(&:purge_obsolete_socks)
474+
end
475+
456476
# return chunk id to be committed
457477
def read_ack_from_sock(sock, unpacker)
458478
begin
@@ -487,8 +507,13 @@ def read_ack_from_sock(sock, unpacker)
487507
log.error "unexpected error while receiving ack message", error: e
488508
log.error_backtrace
489509
ensure
490-
info.sock.close_write rescue nil
491-
info.sock.close rescue nil
510+
if @keepalive
511+
info.node.socket_cache.dec_ref_by_value(info.sock)
512+
else
513+
info.sock.close_write rescue nil
514+
info.sock.close rescue nil
515+
end
516+
492517
@sock_ack_waiting_mutex.synchronize do
493518
@sock_ack_waiting.delete(info)
494519
end
@@ -516,6 +541,9 @@ def ack_reader
516541
# (2) the node does support sending response but responses have not arrived for some reasons.
517542
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
518543
info.node.disable!
544+
if @keepalive
545+
info.node.socket_cache.revoke_by_value(info.sock)
546+
end
519547
info.sock.close rescue nil
520548
rollback_write(info.chunk_id, update_retry: false)
521549
else
@@ -541,7 +569,142 @@ def ack_reader
541569
end
542570

543571
class Node
544-
def initialize(sender, server, failure:)
572+
class SocketCache
573+
TimedSocket = Struct.new(:timeout, :sock, :ref)
574+
575+
def initialize(timeout, log)
576+
@log = log
577+
@timeout = timeout
578+
@active_socks = {}
579+
@inactive_socks = {}
580+
@mutex = Mutex.new
581+
end
582+
583+
def revoke(key = Thread.current.object_id)
584+
@mutex.synchronize do
585+
if @active_socks[key]
586+
@inactive_socks[key] = @active_socks.delete(key)
587+
@inactive_socks[key].ref = 0
588+
end
589+
end
590+
end
591+
592+
def clear
593+
@mutex.synchronize do
594+
@inactive_socks.values.each do |s|
595+
s.sock.close rescue nil
596+
end
597+
@inactive_socks.clear
598+
599+
@active_socks.values.each do |s|
600+
s.sock.close rescue nil
601+
end
602+
@active_socks.clear
603+
end
604+
end
605+
606+
def purge_obsolete_socks
607+
@mutex.synchronize do
608+
@inactive_socks.keys.each do |k|
609+
# 0 means sockets stored in this class received all acks
610+
if @inactive_socks[k].ref <= 0
611+
s = @inactive_socks.delete(k)
612+
s.sock.close rescue nil
613+
@log.debug("purged obsolete socket #{s.sock}")
614+
end
615+
end
616+
617+
@active_socks.keys.each do |k|
618+
if expired?(k) && @active_socks[k].ref <= 0
619+
@inactive_socks[k] = @active_socks.delete(k)
620+
end
621+
end
622+
end
623+
end
624+
625+
# We expect that `yield` returns a unique object in this class
626+
def fetch_or(key = Thread.current.object_id)
627+
@mutex.synchronize do
628+
unless @active_socks[key]
629+
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
630+
@log.debug("connect new socket #{@active_socks[key]}")
631+
return @active_socks[key].sock
632+
end
633+
634+
if expired?(key)
635+
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
636+
@inactive_socks[key] = @active_socks.delete(key)
637+
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
638+
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
639+
end
640+
641+
@active_socks[key].ref += 1;
642+
@active_socks[key].sock
643+
end
644+
end
645+
646+
def dec_ref(key = Thread.current.object_id)
647+
@mutex.synchronize do
648+
if @active_socks[key]
649+
@active_socks[key].ref -= 1
650+
elsif @inactive_socks[key]
651+
@inactive_socks[key].ref -= 1
652+
else
653+
@log.warn("Not found key for dec_ref: #{key}")
654+
end
655+
end
656+
end
657+
658+
# This method is expected to be called in class which doesn't call #inc_ref
659+
def dec_ref_by_value(val)
660+
@mutex.synchronize do
661+
sock = @active_socks.detect { |_, v| v.sock == val }
662+
if sock
663+
key = sock.first
664+
@active_socks[key].ref -= 1
665+
return
666+
end
667+
668+
sock = @inactive_socks.detect { |_, v| v.sock == val }
669+
if sock
670+
key = sock.first
671+
@inactive_socks[key].ref -= 1
672+
return
673+
else
674+
@log.warn("Not found key for dec_ref_by_value: #{key}")
675+
end
676+
end
677+
end
678+
679+
# This method is expected to be called in class which doesn't call #fetch_or
680+
def revoke_by_value(val)
681+
@mutex.synchronize do
682+
sock = @active_socks.detect { |_, v| v.sock == val }
683+
if sock
684+
key = sock.first
685+
@inactive_socks[key] = @active_socks.delete(key)
686+
@inactive_socks[key].ref = 0
687+
else
688+
@log.debug("Not found for revoke_by_value :#{val}")
689+
end
690+
end
691+
end
692+
693+
private
694+
695+
def timeout
696+
@timeout && Time.now + @timeout
697+
end
698+
699+
# This method is thread unsafe
700+
def expired?(key = Thread.current.object_id)
701+
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
702+
end
703+
end
704+
705+
# @param keepalive [Bool]
706+
# @param keepalive_timeout [Integer | nil]
707+
def initialize(sender, server, failure:, keepalive: false, keepalive_timeout: nil)
545708
@sender = sender
546709
@log = sender.log
547710
@compress = sender.compress
@@ -574,13 +737,19 @@ def initialize(sender, server, failure:)
574737
@resolved_host = nil
575738
@resolved_time = 0
576739
@resolved_once = false
740+
741+
@keepalive = keepalive
742+
if @keepalive
743+
@socket_cache = SocketCache.new(keepalive_timeout, @log)
744+
end
577745
end
578746

579747
attr_accessor :usock
580748

581749
attr_reader :name, :host, :port, :weight, :standby, :state
582750
attr_reader :sockaddr # used by on_heartbeat
583751
attr_reader :failure, :available # for test
752+
attr_reader :socket_cache # for ack
584753

585754
RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)
586755

@@ -601,15 +770,12 @@ def standby?
601770
end
602771

603772
def verify_connection
604-
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
605-
begin
773+
connect do |sock|
606774
ri = RequestInfo.new(@sender.security ? :helo : :established)
607775
if ri.state != :established
608776
establish_connection(sock, ri)
609777
raise if ri.state != :established
610778
end
611-
ensure
612-
sock.close
613779
end
614780
end
615781

@@ -675,24 +841,44 @@ def send_data_actual(sock, tag, chunk)
675841
end
676842

677843
def send_data(tag, chunk)
678-
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
844+
sock = connect
845+
679846
begin
680847
send_data_actual(sock, tag, chunk)
681848
rescue
682-
sock.close rescue nil
849+
if @keepalive
850+
@socket_cache.revoke
851+
else
852+
sock.close rescue nil
853+
end
683854
raise
684855
end
685856

686857
if @sender.require_ack_response
687858
return sock # to read ACK from socket
688859
end
689860

690-
sock.close_write rescue nil
691-
sock.close rescue nil
861+
if @keepalive
862+
@socket_cache.dec_ref
863+
else
864+
sock.close_write rescue nil
865+
sock.close rescue nil
866+
end
692867
heartbeat(false)
693868
nil
694869
end
695870

871+
def clear
872+
@keepalive && @socket_cache.clear
873+
end
874+
875+
def purge_obsolete_socks
876+
unless @keepalive
877+
raise "Don not call this method without keepalive option"
878+
end
879+
@socket_cache.purge_obsolete_socks
880+
end
881+
696882
# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
697883
def send_heartbeat
698884
begin
@@ -708,7 +894,7 @@ def send_heartbeat
708894

709895
case @sender.heartbeat_type
710896
when :transport
711-
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
897+
connect(dest_addr) do |sock|
712898
## don't send any data to not cause a compatibility problem
713899
# sock.write FORWARD_TCP_HEARTBEAT_DATA
714900

@@ -883,6 +1069,32 @@ def on_read(sock, ri, data)
8831069
raise "BUG: unknown session state: #{ri.state}"
8841070
end
8851071
end
1072+
1073+
private
1074+
1075+
def connect(host = nil)
1076+
sock = if @keepalive
1077+
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
1078+
else
1079+
@log.debug('connect new socket')
1080+
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
1081+
end
1082+
1083+
if block_given?
1084+
begin
1085+
yield(sock)
1086+
rescue
1087+
@socket_cache.revoke(sock) if @keepalive
1088+
raise
1089+
else
1090+
@socket_cache.dec_ref(sock) if @keepalive
1091+
ensure
1092+
sock.close unless @keepalive
1093+
end
1094+
else
1095+
sock
1096+
end
1097+
end
8861098
end
8871099

8881100
# Override Node to disable heartbeat

0 commit comments

Comments
 (0)