Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic #16494

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from

Conversation

frankvicky
Copy link
Collaborator

I have make a new implementation of ConsumerRecords#records(String) and I want to test this implementation in CI.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@Benchmark
public void records() {
// original one
records.recordsWithNestedList("topic2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have to consume the result to make sure JVM won't eliminate it for optimization.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will avoid it by using Blackhole

@frankvicky
Copy link
Collaborator Author

Hi @chia7712,

Here are the benchmark results after adding Blackhole to prevent JVM optimization. The new implementation is slightly slower, which indicates that the previous version was indeed being optimized by the JVM. However, the new implementation is still significantly faster than the current one.

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM invoker: /Users/frankvicky/.sdkman/candidates/java/17.0.11-amzn/bin/java
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.apache.kafka.jmh.record.ConsumerRecordsBenchmark.recordsWithFilterIterator

Benchmark                                           Mode  Cnt   Score   Error  Units
ConsumerRecordsBenchmark.records                    avgt   10  61.244 ± 0.194  ns/op
ConsumerRecordsBenchmark.recordsWithFilterIterator  avgt   10   2.371 ± 0.046  ns/op
JMH benchmarks done

@frankvicky
Copy link
Collaborator Author

Hello @chia7712 , I have write another implementation which is almost same as original logic. The different of new one is that it filter the TopicPartition in the ConcatenatedIterable to avoid creating double array list.

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM invoker: /Users/frankvicky/.sdkman/candidates/java/17.0.11-amzn/bin/java
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op

# original one
ConsumerRecordsBenchmark.records                    avgt   10  61.881 ± 0.872  ns/op
# latest
ConsumerRecordsBenchmark.records2                   avgt   10   2.206 ± 0.007  ns/op
# filter each records in ConcatenatedIterable
ConsumerRecordsBenchmark.recordsWithFilterIterator  avgt   10   2.344 ± 0.021  ns/op

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for patch. Please remove some unnecessary scenario to cleanup this PR

return Collections.unmodifiableList(recs);
}

public Iterable<ConsumerRecord<K, V>> records(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not moving this new method to origin ConsumerRecords?


public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) {
this.iterables = iterables;
}

public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables, Predicate<ConsumerRecord<K, V>> predicate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this version as it has big performance issue, right?

public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to use ConcatenatedIterable. for example:

    public Iterable<ConsumerRecord<K, V>> records(String topic) {
        if (topic == null)
            throw new IllegalArgumentException("Topic must be non-null.");

        return () -> new AbstractIterator<ConsumerRecord<K, V>>() {
            final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> iter = records.entrySet().iterator();
            Iterator<ConsumerRecord<K, V>> current = null;
            @Override
            protected ConsumerRecord<K, V> makeNext() {
                if (current == null || !current.hasNext()) {
                    while (iter.hasNext()) {
                        Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = iter.next();
                        if (entry.getKey().topic().equals(topic) && !entry.getValue().isEmpty()) {
                            current = entry.getValue().iterator();
                            break;
                        }
                    }
                }
                if (current == null || !current.hasNext()) return allDone();
                return current.next();
            }
        };
    }

@frankvicky
Copy link
Collaborator Author

Hi @chia7712
I have refactor the method, PTAL 🐱

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this patch


@Benchmark
public void recordsWithFilterIterator(Blackhole blackhole) {
blackhole.consume(records.records("topic2"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please iterate the Iterable since the cost of iteration is important too.

@Benchmark
public void records(Blackhole blackhole) {
// original one
for (ConsumerRecord<Integer, String> record : records.recordsWithNestedList("topic2")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have two benchmarks: 1) create iterable 2) iterate all records

I assume your approach will have better score in "create iterable" and similar score in "iterate all records"

@frankvicky
Copy link
Collaborator Author

Hi @chia7712
I have both iterate test and init test, PTAL

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM options: <none>
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.apache.kafka.jmh.record.ConsumerRecordsBenchmark.recordsByNewImplementation

Benchmark                                                    Mode  Cnt       Score       Error  Units
ConsumerRecordsBenchmark.iteratorRecords                     avgt   10  288547.940 ± 25990.307  ns/op
ConsumerRecordsBenchmark.iteratorRecordsByNewImplementation  avgt   10  266188.726 ± 40392.469  ns/op
ConsumerRecordsBenchmark.records                             avgt   10      61.363 ±     0.084  ns/op
ConsumerRecordsBenchmark.recordsByNewImplementation          avgt   10       1.113 ±     0.004  ns/op

@chia7712
Copy link
Contributor

chia7712 commented Jul 8, 2024

@frankvicky the jmh result is good to me. Could you please adjust the PR to add a subclass of ConsumerRecords? that subclass will use the legacy records(String) and then please rerun the jmh again. Thus, we can have the new impl in the production and the legacy code in jmh for comparison.

@frankvicky
Copy link
Collaborator Author

Hi @chia7712
I have make some refactors based on comment, PTAL 😃

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this patch

import java.util.List;
import java.util.Map;

public class LegacyConsumerRecords<K, V> extends ConsumerRecords<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to jmh module

@chia7712
Copy link
Contributor

chia7712 commented Jul 8, 2024

@frankvicky please add jmh result (according to latest commit) to the description

@frankvicky
Copy link
Collaborator Author

Hi @chia7712
I have move the subclass to jmh module, and following is the latest benchamrk:

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM options: <none>
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op

Benchmark                                                       Mode  Cnt       Score       Error  Units
ConsumerRecordsBenchmark.iteratorRecords                        avgt   10  553053.483 ± 13412.339  ns/op
ConsumerRecordsBenchmark.iteratorRecordsByLegacyImplementation  avgt   10  498448.180 ± 37297.950  ns/op
ConsumerRecordsBenchmark.records                                avgt   10       1.120 ±     0.004  ns/op
ConsumerRecordsBenchmark.recordsWithLegacyImplementation        avgt   10      61.529 ±     0.391  ns/op
JMH benchmarks done

@frankvicky
Copy link
Collaborator Author

The benchmark of recent commits:

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM options: <none>
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op

Benchmark                                                       Mode  Cnt       Score       Error  Units
ConsumerRecordsBenchmark.iteratorRecords                        avgt   10  544914.518 ± 18116.955  ns/op
ConsumerRecordsBenchmark.iteratorRecordsByLegacyImplementation  avgt   10  536728.635 ± 10104.066  ns/op
ConsumerRecordsBenchmark.records                                avgt   10       1.116 ±     0.003  ns/op
ConsumerRecordsBenchmark.recordsWithLegacyImplementation        avgt   10      60.926 ±     0.027  ns/op
JMH benchmarks done

@frankvicky
Copy link
Collaborator Author

I have increased the number of warmup iterations to make the benchmark results more stable.

# JMH version: 1.37
# VM version: JDK 17.0.11, OpenJDK 64-Bit Server VM, 17.0.11+9-LTS
# VM options: <none>
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 10 iterations, 10 s each
# Measurement: 10 iterations, 10 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op

Benchmark                                                       Mode  Cnt       Score       Error  Units
ConsumerRecordsBenchmark.iteratorRecords                        avgt   10  553151.074 ±  2273.429  ns/op
ConsumerRecordsBenchmark.iteratorRecordsByLegacyImplementation  avgt   10  566207.722 ± 12791.416  ns/op
ConsumerRecordsBenchmark.records                                avgt   10       1.117 ±     0.002  ns/op
ConsumerRecordsBenchmark.recordsWithLegacyImplementation        avgt   10      61.072 ±     0.026  ns/op

@chia7712
Copy link
Contributor

@frankvicky Could you please revise the topic ?

@frankvicky frankvicky changed the title [TEST] Test new implementation of ConsumerRecords#records(String) in CI [Refactor] Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic Jul 13, 2024
@frankvicky frankvicky changed the title [Refactor] Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic [MINOR] Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic Jul 13, 2024
@chia7712
Copy link
Contributor

@frankvicky could you please rebase code to run CI again?

@chia7712 chia7712 changed the title [MINOR] Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic Jul 17, 2024
@chia7712
Copy link
Contributor

@dajac Do you have free cycle to take a look at this PR? It brings a bit performance improvement when the ConsumerRecords have a bunch of partitions.

@frankvicky
Copy link
Collaborator Author

Hi @chia7712
I have merged the latest trunk into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants