diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index e3b0a60c75..bd2ea83e5b 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -36,6 +36,10 @@ class TcpInput < Input desc "The field name of the client's address." config_param :source_address_key, :string, default: nil + # Setting default to nil for backward compatibility + desc "The max bytes of message." + config_param :message_length_limit, :size, default: nil + config_param :blocking_timeout, :time, default: 0.5 desc 'The payload is read up to this character.' @@ -102,6 +106,7 @@ def start log.info "listening tcp socket", bind: @bind, port: @port del_size = @delimiter.length + discard_till_next_delimiter = false if @_extract_enabled && @_extract_tag_key server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn| unless check_client(conn) @@ -116,6 +121,16 @@ def start msg = buf[pos...i] pos = i + del_size + if discard_till_next_delimiter + discard_till_next_delimiter = false + next + end + + if !@message_length_limit.nil? && @message_length_limit < msg.bytesize + log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32] + next + end + @parser.parse(msg) do |time, record| unless time && record log.warn "pattern not matched", message: msg @@ -131,6 +146,15 @@ def start end end buf.slice!(0, pos) if pos > 0 + # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit. + # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes. + if !@message_length_limit.nil? && @message_length_limit < buf.bytesize + log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32] + buf.clear + # We should discard the subsequent data until the next delimiter comes. + discard_till_next_delimiter = true + next + end end else server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn| @@ -147,6 +171,16 @@ def start msg = buf[pos...i] pos = i + del_size + if discard_till_next_delimiter + discard_till_next_delimiter = false + next + end + + if !@message_length_limit.nil? && @message_length_limit < msg.bytesize + log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32] + next + end + @parser.parse(msg) do |time, record| unless time && record log.warn "pattern not matched", message: msg @@ -161,6 +195,15 @@ def start end router.emit_stream(@tag, es) buf.slice!(0, pos) if pos > 0 + # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit. + # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes. + if !@message_length_limit.nil? && @message_length_limit < buf.bytesize + log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32] + buf.clear + # We should discard the subsequent data until the next delimiter comes. + discard_till_next_delimiter = true + next + end end end end diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index 75fa931cc1..c1d917332e 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -253,4 +253,76 @@ def create_tcp_socket(host, port, &block) assert_equal 'hello', event[2]['msg'] end end + + sub_test_case "message_length_limit" do + data("batch_emit", { extract: "" }, keep: true) + data("single_emit", { extract: "\ntag_key tag\n\n" }, keep: true) + test "drop records exceeding limit" do |data| + message_length_limit = 10 + d = create_driver(base_config + %! + message_length_limit #{message_length_limit} + + @type none + + #{data[:extract]} + !) + d.run(expect_records: 2, timeout: 10) do + create_tcp_socket('127.0.0.1', @port) do |sock| + sock.send("a" * message_length_limit + "\n", 0) + sock.send("b" * (message_length_limit + 1) + "\n", 0) + sock.send("c" * (message_length_limit - 1) + "\n", 0) + end + end + + expected_records = [ + "a" * message_length_limit, + "c" * (message_length_limit - 1) + ] + actual_records = d.events.collect do |event| + event[2]["message"] + end + + assert_equal expected_records, actual_records + end + + test "clear buffer and discard the subsequent data until the next delimiter" do |data| + message_length_limit = 12 + d = create_driver(base_config + %! + message_length_limit #{message_length_limit} + delimiter ";" + + @type json + + #{data[:extract]} + !) + d.run(expect_records: 1, timeout: 10) do + create_tcp_socket('127.0.0.1', @port) do |sock| + sock.send('{"message":', 0) + sock.send('"hello', 0) + sleep 1 # To make the server read data and clear the buffer here. + sock.send('world!"};', 0) # This subsequent data must be discarded so that a parsing failure doesn't occur. + sock.send('{"k":"v"};', 0) # This will succeed to parse. + end + end + + logs = d.logs.collect do |log| + log.gsub(/\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [-+]\d{4} /, "") + end + actual_records = d.events.collect do |event| + event[2] + end + + assert_equal( + { + # Asserting that '[warn]: pattern not matched message="world!\"}"' warning does not occur. + logs: ['[info]: The buffer size exceeds \'message_length_limit\', cleared: limit=12 size=17 head="{\"message\":\"hello"' + "\n"], + records: [{"k" => "v"}], + }, + { + logs: logs[1..], + records: actual_records, + } + ) + end + end end