Skip to content

Commit 528f58e

Browse files
authored
Merge pull request #1169 from mashhurs/non-utf-8-data-handling
Fixes an issue where events containing non-unicode strings could fail to serialize correctly when compression is enabled.
2 parents 80a9c59 + 1aa9ee2 commit 528f58e

File tree

6 files changed

+27
-9
lines changed

6 files changed

+27
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.22.3
2+
- Fixes an issue where events containing non-unicode strings could fail to serialize correctly when compression is enabled [#1169](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1169)
3+
14
## 11.22.2
25
- [DOC] Add content for sending data to Elasticsearch on serverless [#1164](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1164)
36

docs/index.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,11 @@ index level and `monitoring` permissions at cluster level. The `monitoring`
316316
permission at cluster level is necessary to perform periodic connectivity
317317
checks.
318318

319+
[id="plugins-{type}s-{plugin}-handling-non-utf-8"]
320+
==== Handling non UTF-8 data
321+
322+
This plugin transmits events to Elasticsearch using a JSON API, and therefore requires that all string values in events to be valid UTF-8.
323+
When a string value on an event contains one or more byte sequences that are not valid in UTF-8, each offending byte sequence is replaced with the UTF-8 replacement character (`\uFFFD`).
319324

320325
[id="plugins-{type}s-{plugin}-options"]
321326
==== Elasticsearch Output Configuration Options

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module LogStash; module Outputs; class ElasticSearch;
2222
# made sense. We picked one on the lowish side to not use too much heap.
2323
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
2424

25+
2526
class HttpClient
2627
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
2728
# This is here in case we use DEFAULT_OPTIONS in the future
@@ -37,7 +38,7 @@ class HttpClient
3738
# * `:user` - String. The user to use for authentication.
3839
# * `:password` - String. The password to use for authentication.
3940
# * `:timeout` - Float. A duration value, in seconds, after which a socket
40-
# operation or request will be aborted if not yet successfull
41+
# operation or request will be aborted if not yet successful
4142
# * `:client_settings` - a hash; see below for keys.
4243
#
4344
# The `client_settings` key is a has that can contain other settings:
@@ -132,6 +133,9 @@ def bulk(actions)
132133
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
133134
LogStash::Json.dump(action)
134135
as_json << "\n"
136+
137+
as_json.scrub! # ensure generated JSON is valid UTF-8
138+
135139
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
136140
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
137141
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
@@ -496,5 +500,6 @@ def update_action_builder(args, source)
496500
end
497501
[args, source]
498502
end
503+
499504
end
500505
end end end

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.22.2'
3+
s.version = '11.22.3'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/compressed_indexing_spec.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
[ {"http_compression" => true}, {"compression_level" => 1} ].each do |compression_config|
1212
describe "indexing with http_compression turned on", :integration => true do
1313
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
14+
let(:event_with_invalid_utf_8_bytes) { LogStash::Event.new("message" => "Message from spacecraft which contains \xAC invalid \xD7 byte sequences.", "type" => type) }
15+
1416
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
1517
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
1618
let(:event_count) { 10000 + rand(500) }
17-
let(:events) { event_count.times.map { event }.to_a }
19+
# mix the events with valid and invalid UTF-8 payloads
20+
let(:events) { event_count.times.map { |i| i%3 == 0 ? event : event_with_invalid_utf_8_bytes }.to_a }
1821
let(:config) {
1922
{
2023
"hosts" => get_host_port,

spec/unit/outputs/elasticsearch/http_client_spec.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,14 @@
242242
end
243243
end
244244

245-
context "with two messages" do
246-
let(:message1) { "hey" }
247-
let(:message2) { "you" }
245+
context "with multiple messages" do
246+
let(:message_head) { "Spacecraft message" }
247+
let(:message_tail) { "byte sequence" }
248+
let(:invalid_utf_8_message) { "contains invalid \xAC" }
248249
let(:actions) { [
249-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
250-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
250+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_head}],
251+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> invalid_utf_8_message}],
252+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_tail}],
251253
]}
252254
it "executes one bulk_send operation" do
253255
allow(subject).to receive(:join_bulk_responses)
@@ -257,7 +259,7 @@
257259

258260
context "if one exceeds TARGET_BULK_BYTES" do
259261
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
260-
let(:message1) { "a" * (target_bulk_bytes + 1) }
262+
let(:message_head) { "a" * (target_bulk_bytes + 1) }
261263
it "executes two bulk_send operations" do
262264
allow(subject).to receive(:join_bulk_responses)
263265
expect(subject).to receive(:bulk_send).twice

0 commit comments

Comments
 (0)