From 9cd7d38425ad1eb200cd45cceb8c9ba7ab442b99 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 22 Jul 2019 18:14:17 +0900 Subject: [PATCH] delete unused method and move to private Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_forward/ack_handler.rb | 56 +++++++++----------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/lib/fluent/plugin/out_forward/ack_handler.rb b/lib/fluent/plugin/out_forward/ack_handler.rb index 3c299145c3..cf7e4d952d 100644 --- a/lib/fluent/plugin/out_forward/ack_handler.rb +++ b/lib/fluent/plugin/out_forward/ack_handler.rb @@ -82,6 +82,31 @@ def ack_reader(select_interval) end end + ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do + def expired?(now) + time + timeout < now + end + end + + Ack = Struct.new(:id, :handler, :node) do + def enqueue(sock) + handler.enqueue(node, sock, id) + end + end + + def create_ack(id, node) + Ack.new(id, self, node) + end + + def enqueue(node, sock, cid) + info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now, @timeout) + @mutex.synchronize do + @ack_waitings << info + end + end + + private + def read_ack_from_sock(sock) begin raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length) @@ -120,37 +145,6 @@ def read_ack_from_sock(sock) delete(info) end - def synchronize - @mutex.synchronize do - yield - end - end - - ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do - def expired?(now) - time + timeout < now - end - end - - Ack = Struct.new(:id, :handler, :node) do - def enqueue(sock) - handler.enqueue(node, sock, id) - end - end - - def create_ack(id, node) - Ack.new(id, self, node) - end - - def enqueue(node, sock, cid) - info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now, @timeout) - @mutex.synchronize do - @ack_waitings << info - end - end - - private - def dump_unique_id_hex(unique_id) Fluent::UniqueId.hex(unique_id) end