-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit #16482
Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit #16482
Conversation
…er instead of a JRuby array, added more tests to cover some 'buffer full' conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions:
- there's quite a bit going on here and this is feature that is in the critical path, any clue on the performance impact of this PR?
- Since the size checks are done between the buffered data and the first message, it's possible for the codec to output messages larger than the setting if the incoming data never touches the buffer:
jruby-9.4.8.0 :019 > buffer = FileWatch::BufferedTokenizer.new("\n", 5)
=> #<FileWatch::BufferedTokenizer:0xff2f2ae>
jruby-9.4.8.0 :020 > buffer.extract("1234").each { |line| puts line; puts line.length }
=> []
jruby-9.4.8.0 :021 > buffer.extract("5\n0123456789\n").each { |line| puts line; puts line.length }
12345
5
0123456789
10
=> ["12345", "0123456789"]
This is OK if codecs and plugins clearly describe that the setting exposed to the user controls only the "leftover buffer size" and not maximum message size. This is hard for the user to understand, which will likely expect it to be message size, but they could set the limit to 5MB but still get a 50MB message.
An alternative would be to perform the size check much earlier in extract
, maybe right after data.convertToString
(this is half-baked thought), what do you think?
throw new IllegalStateException("input buffer full"); | ||
} | ||
this.inputSize = inputSize + entitiesSize; | ||
} | ||
input.append(entities.shift(context)); | ||
if (entities.isEmpty()) { | ||
headToken.append(input.shift(context)); // remove head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can avoid some overhead when we know there's no leftover content at the end of the incoming data:
if (input.getLength() < 2) {
headToken.append(input.shift(context)); // remove head
return RubyUtil.RUBY.newArray();
} else {
if (headToken.length() > 0) {
headToken.append(input.shift(context)); // append buffer to first element and
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
headToken = new StringBuilder();
}
headToken.append(input.pop(context)); // put the leftovers in headToken for later
inputSize = headToken.length();
return input;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that but complicates even more the flow. Before applying that maybe a measure of effective performance gain should be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About question 1, I haven't done any performance check on this. However, we should define the use cases to check. The buffer full condition in the existing implementation at certain point didn't provide correct results, so if we use the original implementation as baseline, we aren't testing apples-to-apples. The only thing we can test is aggregation process, while any token size doesn't fail the size limit.
I could think to:
- tokens that are smaller then a full segment
- tokens that span multiple segments
- tokens that are always spread between the end of a preceding segment and the subsequent one.
On point number 2, this is a bug. The expectation for the client of this tokenizer is that lines (or tokens) bigger than the predefined limit generates an exception and that the normal flow is able to resume on the next extract
call immediately after the token delimiter that marks the end of the offending token. So if the token that should trigger the error is not the first of a segment, is must still raise the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andsel given my proposal improves performance should we add it to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I missed to add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added with 99292ca
BenchmarkSummaryAs suggested by the flamegraphs that @jsvd measured, this implementation has a performance loss, and the big suspect is the copying of splitted data into the input accumulator array: https://github.com/elastic/logstash/pull/16482/files#diff-5c7f8990e98f54782395d29b4b1b5b68cf6f782b34af5eb8f1b5a77331e0172eR84. With this PR the total consumption of A variation < 5% could be considered negligible, in particular with the usage of the tokenizer inside an input codec, where the latencies of the network or the locks to access the in memory queue, could make a bigger impact in the overall performance. So we could assume that this PR doesn't significantly influence the overall performance. DetailsResult with performance metering of the
Raw baseline results
Bugfix results
Flamegraphs5 feeding pipelines feeding one pipeline with employed the The loader LS can be configured with these in - pipeline.id: test_1
pipeline.workers: 2
pipeline.batch.size: 200
config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_2
pipeline.workers: 2
pipeline.batch.size: 200
config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_3
pipeline.workers: 2
pipeline.batch.size: 200
config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_4
pipeline.workers: 2
pipeline.batch.size: 200
config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_5
pipeline.workers: 2
pipeline.batch.size: 200
config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}" While the system under test can be executed with bin/logstash -e "input { tcp { port => 3333 codec => line } } output { null {} }" Used the asynch porofiler from https://github.com/async-profiler/async-profiler running with bin/asprof -d 20 -i 10ms -f /tmp/flamegraph.html <LS PID> |
Quality Gate passedIssues Measures |
💛 Build succeeded, but was flaky
Failed CI StepsHistory
cc @andsel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@logstashmachine backport 8.x |
…ines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce)
… consume lines in case of lines bigger then sizeLimit (#16569) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce) Co-authored-by: Andrea Selva <selva.andre@gmail.com>
@logstashmachine backport 7.17 |
1 similar comment
@logstashmachine backport 7.17 |
…ines bigger then sizeLimit (elastic#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests.
@logstashmachine backport 8.15 |
…ines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce)
@logstashmachine backport 8.16 |
…ines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce)
…ines bigger then sizeLimit (#16482) (#16579) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce) Co-authored-by: Andrea Selva <selva.andre@gmail.com>
…onsume lines in case of lines bigger then sizeLimit (#16577) * Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. * Fixed compilation error due to libraries incompatibilities - usage of `data.convertToString().split(context, delimiter, MINUS_ONE);` instead of `data.convertToString().split(delimiter, -1);` - avoid to extend BuffererdTokenir test cases from `org.logstash.RubyTestBase` which was introduced in #13159 - JDK 8 compatibilities: - `Arrays.asList` vs `List.of` - `assertThrows` method from JUnit5 not available in JUnit4 so reimplemented in the test
Release notes
[rn:skip]
What does this PR do?
Updates
BufferedTokenizerExt
so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.Updated the accumulation variable from
RubyArray
containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in theinput
array.Port the tests present at
logstash/logstash-core/spec/logstash/util/buftok_spec.rb
Line 20 in f35e10d
Why is it important/What is the impact to the user?
Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.
Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration files (and/or docker env variables)Author's Checklist
How to test this PR locally
Follow the instructions in #16483
Related issues
Use cases
Screenshots
Logs