Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-174: Provide new implementation for broker dispatch cache #15954

Open
merlimat opened this issue Jun 7, 2022 · 2 comments
Open

PIP-174: Provide new implementation for broker dispatch cache #15954

merlimat opened this issue Jun 7, 2022 · 2 comments
Assignees
Milestone

Comments

@merlimat
Copy link
Contributor

merlimat commented Jun 7, 2022

Motivation

The current implementation of the read cache in the Pulsar broker has largely
remained unchanged for a long time, except for a few minor tweaks.

While the implementation is stable and reasonably efficient for typical workloads,
the overhead required for managing the cache evictions in a broker that is running
many topics can be pretty high in terms of extra CPU utilization and on the JVM
garbage collection to track an increased number of medium-lived objects.

The goal is to provide an alternative implementation that can adapt better to
a wider variety of operating conditions.

Current implementation details

The broker cache is implemented as part of the ManagedLedger component,
which sits in the Pulsar broker and provides a higher level of abstraction of top
of BookKeeper.

Each topic (and managed-ledger) has its own private cache space. This cache is implemented
as a ConcurrentSkipList sorted map that maps (ledgerId, entryId) -> payload. The payload
is a ByteBuf reference that can either be a slice of a ByteBuf that we got
when reading from a socket, or it can be a copied buffer.

Each topic cache is allowed to use the full broker max cache size before an
eviction is triggered. The total cache size is effectively a resource shared across all
the topics, where a topic can use a more prominent portion of it if it "asks for more".

When the eviction happens, we need to do an expensive ranking of all the caches in the broker
and do an eviction in a proportional way to the currently used space for each of them.

The bigger problem is represented by the ConcurrentSkipList and the ByteBuf objects
that need to be tracked. The skip list is essentially like a "tree" structure and needs to
maintain Java objects for each entry in the cache. We also need to potentially have
a huge number of ByteBuf objects.

A cache workload is typically the worst-case scenario for each garbage
collector implementation because it involves creating objects, storing them for some amount of
time and then throwing them away. During that time, the GC would have already tenured these
objects and copy them into an "old generation" space, and sometime later, a costly compaction
of that memory would have to be performed.

To mitigate the effect of the cache workload on the GC, we're being very aggressive in
purging the cache by triggering time-based eviction. By putting a max TTL on the elements in
the cache, we can avoid keeping the objects around for too long to be a problem for the GC.

The reverse side of this is that we're artificially reducing the cache capacity to a very
short time frame, reducing the cache usefulness.

The other problem is the CPU cost involved in doing these frequent evictions, which can
be very high when there are 10s of thousands of topics in a broker.

Proposed changes

Instead of dealing with individual caches for each topic, let's adopt a model where
there is a single cache space for the broker.

This cache is broken into N segments which act as a circular buffer. Whenever a segment
is full, we start writing into the next one, and when we reach the last one, we will
restart recycling the first segment.

This model has been working very well for the BookKeeper ReadCache:
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java

The eviction becomes a completely trivial operation, buffers are just rotated and
overwritten. We don't need to do any per-topic task or keep track of utilization.

Today, there are 2 ways of configuring the cache, one that "copies" data into the cache
and another that will just use reference-counting on the original buffers to avoid
payload copies.

Memory copies into the cache

Each segment is composed of a buffer, an offset, and a hashmap which maps
(ledgerId, entryId) -> offset.

The advantage of this approach is that entries are copied into the cache buffer
(in direct memory), and we don't need to keep any long-lived Java objects around

Keeping reference-counted buffers in the cache

Each segment in the cache will contain a map (ledgerId, entryId) -> ByteBuf.
Buffers will have an increase reference count that will keep the data alive as long
as the buffer is in the cache and it will be released when the cache segment is rotated.

The advantage is we avoid any memory copy when inserting into or reading from the cache.
The disadvantage is that we will have references to all the ByteBuf objects that are in the cache.

API changes

No user-facing API changes are required.

New configuration options

The existing cache implementation will not be removed at this point. Users will
be able to configure the old implementation in broker.conf.

This option will be helpful in case of performance regressions would be seen for
some use cases with the new cache implementation.

@lhotari
Copy link
Member

lhotari commented Jun 14, 2022

btw. The current broker cache seems to be broken for scenarios where there are multiple active cursors (/consumers). That happened in 2.8.2 / 2.9.0 . I have created an issue #16054 .

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants