Skip to content

Resolve 413 Payload Too Large #786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ def safe_bulk(actions)
# Even though we retry the user should be made aware of these
if e.response_code == 429
logger.debug(message, log_hash)
# 413 means that the request is to large. So trying to decrease the request size if there
# actually where multiple actions in a bulk request.
elsif e.response_code == 413 && e.num_actions_in_request > 1
@client.target_bulk_bytes /= 2;
logger.info(message + "and decreased request size " , log_hash)
else
logger.error(message, log_hash)
end
Expand Down
42 changes: 22 additions & 20 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,11 @@
require 'stringio'

module LogStash; module Outputs; class ElasticSearch;
# This is a constant instead of a config option because
# there really isn't a good reason to configure it.
#
# The criteria used are:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
attr_accessor :target_bulk_bytes

# This is here in case we use DEFAULT_OPTIONS in the future
# DEFAULT_OPTIONS = {
# :setting => value
Expand Down Expand Up @@ -65,6 +53,18 @@ def initialize(options={})
# mutex to prevent requests and sniffing to access the
# connection pool at the same time
@bulk_path = @options[:bulk_path]

# The criteria for deciding the initial value of target_bulk_bytes is:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
@target_bulk_bytes ||= 20 * 1024 * 1024 # 20MiB
end

def build_url_template
Expand Down Expand Up @@ -93,7 +93,6 @@ def maximum_seen_major_version
def bulk(actions)
@action_count ||= 0
@action_count += actions.size

return if actions.empty?

bulk_actions = actions.collect do |action, args, source|
Expand All @@ -114,18 +113,21 @@ def bulk(actions)
stream_writer = body_stream
end
bulk_responses = []
actions_in_bulk = 0
bulk_actions.each do |action|
as_json = action.is_a?(Array) ?
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
actions_in_bulk += 1
if (body_stream.size + as_json.bytesize) > @target_bulk_bytes
bulk_responses << bulk_send(body_stream, actions_in_bulk) unless body_stream.size == 0
actions_in_bulk = 0
end
stream_writer.write(as_json)
end
stream_writer.close if http_compression
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
bulk_responses << bulk_send(body_stream, actions_in_bulk) if body_stream.size > 0
body_stream.close if !http_compression
join_bulk_responses(bulk_responses)
end
Expand All @@ -137,7 +139,7 @@ def join_bulk_responses(bulk_responses)
}
end

def bulk_send(body_stream)
def bulk_send(body_stream, num_actions_in_request)
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
# Discard the URL
response = @pool.post(@bulk_path, params, body_stream.string)
Expand All @@ -151,7 +153,7 @@ def bulk_send(body_stream)
if response.code != 200
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
response.code, url, body_stream.to_s, response.body, num_actions_in_request
)
end

Expand Down
5 changes: 3 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient;
class Pool
class NoConnectionAvailableError < Error; end
class BadResponseCodeError < Error
attr_reader :url, :response_code, :request_body, :response_body
attr_reader :url, :response_code, :request_body, :response_body, :num_actions_in_request

def initialize(response_code, url, request_body, response_body)
def initialize(response_code, url, request_body, response_body, num_actions_in_request = 1)
@response_code = response_code
@url = url
@request_body = request_body
@response_body = response_body
@num_actions_in_request = num_actions_in_request
end

def message
Expand Down
9 changes: 4 additions & 5 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
require_relative "../../../spec/es_spec_helper"
require "logstash/outputs/elasticsearch"

describe "TARGET_BULK_BYTES", :integration => true do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
describe "target_bulk_bytes", :integration => true do
let(:event_count) { 1000 }
let(:events) { event_count.times.map { event }.to_a }
let(:config) {
Expand All @@ -22,11 +21,11 @@
end

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
let(:event) { LogStash::Event.new("message" => "a " * (((subject.client.target_bulk_bytes/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= subject.client.target_bulk_bytes
end
end

Expand All @@ -37,7 +36,7 @@

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= subject.client.target_bulk_bytes
end
end
end
Expand Down
12 changes: 5 additions & 7 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,13 @@
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
]}

context "if a message is over TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message) { "a" * (target_bulk_bytes + 1) }
context "if a message is over target_bulk_bytes" do
let(:message) { "a" * (subject.target_bulk_bytes + 1) }

it "should be handled properly" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once do |data|
expect(data.size).to be > target_bulk_bytes
expect(data.size).to be > subject.target_bulk_bytes
end
s = subject.send(:bulk, actions)
end
Expand All @@ -216,9 +215,8 @@
s = subject.send(:bulk, actions)
end

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
context "if one exceeds target_bulk_bytes" do
let(:message1) { "a" * (subject.target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice
Expand Down