MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic#16494
MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic#16494frankvicky wants to merge 17 commits intoapache:trunkfrom
Conversation
| @Benchmark | ||
| public void records() { | ||
| // original one | ||
| records.recordsWithNestedList("topic2"); |
There was a problem hiding this comment.
you have to consume the result to make sure JVM won't eliminate it for optimization.
There was a problem hiding this comment.
Sure, I will avoid it by using Blackhole
|
Hi @chia7712, Here are the benchmark results after adding |
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
Outdated
Show resolved
Hide resolved
|
Hello @chia7712 , I have write another implementation which is almost same as original logic. The different of new one is that it filter the |
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for patch. Please remove some unnecessary scenario to cleanup this PR
| return Collections.unmodifiableList(recs); | ||
| } | ||
|
|
||
| public Iterable<ConsumerRecord<K, V>> records(String topic) { |
There was a problem hiding this comment.
why not moving this new method to origin ConsumerRecords?
| this.iterables = iterables; | ||
| } | ||
|
|
||
| public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables, Predicate<ConsumerRecord<K, V>> predicate) { |
There was a problem hiding this comment.
please remove this version as it has big performance issue, right?
| public Iterable<ConsumerRecord<K, V>> records(String topic) { | ||
| if (topic == null) | ||
| throw new IllegalArgumentException("Topic must be non-null."); | ||
| return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic)); |
There was a problem hiding this comment.
Maybe we don't need to use ConcatenatedIterable. for example:
public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
return () -> new AbstractIterator<ConsumerRecord<K, V>>() {
final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> iter = records.entrySet().iterator();
Iterator<ConsumerRecord<K, V>> current = null;
@Override
protected ConsumerRecord<K, V> makeNext() {
if (current == null || !current.hasNext()) {
while (iter.hasNext()) {
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = iter.next();
if (entry.getKey().topic().equals(topic) && !entry.getValue().isEmpty()) {
current = entry.getValue().iterator();
break;
}
}
}
if (current == null || !current.hasNext()) return allDone();
return current.next();
}
};
}|
Hi @chia7712 |
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for this patch
|
|
||
| @Benchmark | ||
| public void recordsWithFilterIterator(Blackhole blackhole) { | ||
| blackhole.consume(records.records("topic2")); |
There was a problem hiding this comment.
please iterate the Iterable since the cost of iteration is important too.
| @Benchmark | ||
| public void records(Blackhole blackhole) { | ||
| // original one | ||
| for (ConsumerRecord<Integer, String> record : records.recordsWithNestedList("topic2")) { |
There was a problem hiding this comment.
Please have two benchmarks: 1) create iterable 2) iterate all records
I assume your approach will have better score in "create iterable" and similar score in "iterate all records"
|
Hi @chia7712 |
|
@frankvicky the jmh result is good to me. Could you please adjust the PR to add a subclass of |
|
Hi @chia7712 |
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for this patch
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| public class LegacyConsumerRecords<K, V> extends ConsumerRecords<K, V> { |
|
@frankvicky please add jmh result (according to latest commit) to the description |
|
Hi @chia7712 |
|
The benchmark of recent commits: |
|
I have increased the number of warmup iterations to make the benchmark results more stable. |
|
@frankvicky Could you please revise the topic ? |
|
@frankvicky could you please rebase code to run CI again? |
|
@dajac Do you have free cycle to take a look at this PR? It brings a bit performance improvement when the |
|
Hi @chia7712 |
I have make a new implementation of
ConsumerRecords#records(String)and I want to test this implementation in CI.Committer Checklist (excluded from commit message)