You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The current implementation of the read cache in the Pulsar broker has largely
remained unchanged for a long time, except for a few minor tweaks.
While the implementation is stable and reasonably efficient for typical workloads,
the overhead required for managing the cache evictions in a broker that is running
many topics can be pretty high in terms of extra CPU utilization and on the JVM
garbage collection to track an increased number of medium-lived objects.
The goal is to provide an alternative implementation that can adapt better to
a wider variety of operating conditions.
Current implementation details
The broker cache is implemented as part of the ManagedLedger component,
which sits in the Pulsar broker and provides a higher level of abstraction of top
of BookKeeper.
Each topic (and managed-ledger) has its own private cache space. This cache is implemented
as a ConcurrentSkipList sorted map that maps (ledgerId, entryId) -> payload. The payload
is a ByteBuf reference that can either be a slice of a ByteBuf that we got
when reading from a socket, or it can be a copied buffer.
Each topic cache is allowed to use the full broker max cache size before an
eviction is triggered. The total cache size is effectively a resource shared across all
the topics, where a topic can use a more prominent portion of it if it "asks for more".
When the eviction happens, we need to do an expensive ranking of all the caches in the broker
and do an eviction in a proportional way to the currently used space for each of them.
The bigger problem is represented by the ConcurrentSkipList and the ByteBuf objects
that need to be tracked. The skip list is essentially like a "tree" structure and needs to
maintain Java objects for each entry in the cache. We also need to potentially have
a huge number of ByteBuf objects.
A cache workload is typically the worst-case scenario for each garbage
collector implementation because it involves creating objects, storing them for some amount of
time and then throwing them away. During that time, the GC would have already tenured these
objects and copy them into an "old generation" space, and sometime later, a costly compaction
of that memory would have to be performed.
To mitigate the effect of the cache workload on the GC, we're being very aggressive in
purging the cache by triggering time-based eviction. By putting a max TTL on the elements in
the cache, we can avoid keeping the objects around for too long to be a problem for the GC.
The reverse side of this is that we're artificially reducing the cache capacity to a very
short time frame, reducing the cache usefulness.
The other problem is the CPU cost involved in doing these frequent evictions, which can
be very high when there are 10s of thousands of topics in a broker.
Proposed changes
Instead of dealing with individual caches for each topic, let's adopt a model where
there is a single cache space for the broker.
This cache is broken into N segments which act as a circular buffer. Whenever a segment
is full, we start writing into the next one, and when we reach the last one, we will
restart recycling the first segment.
The eviction becomes a completely trivial operation, buffers are just rotated and
overwritten. We don't need to do any per-topic task or keep track of utilization.
Today, there are 2 ways of configuring the cache, one that "copies" data into the cache
and another that will just use reference-counting on the original buffers to avoid
payload copies.
Memory copies into the cache
Each segment is composed of a buffer, an offset, and a hashmap which maps (ledgerId, entryId) -> offset.
The advantage of this approach is that entries are copied into the cache buffer
(in direct memory), and we don't need to keep any long-lived Java objects around
Keeping reference-counted buffers in the cache
Each segment in the cache will contain a map (ledgerId, entryId) -> ByteBuf.
Buffers will have an increase reference count that will keep the data alive as long
as the buffer is in the cache and it will be released when the cache segment is rotated.
The advantage is we avoid any memory copy when inserting into or reading from the cache.
The disadvantage is that we will have references to all the ByteBuf objects that are in the cache.
API changes
No user-facing API changes are required.
New configuration options
The existing cache implementation will not be removed at this point. Users will
be able to configure the old implementation in broker.conf.
This option will be helpful in case of performance regressions would be seen for
some use cases with the new cache implementation.
The text was updated successfully, but these errors were encountered:
btw. The current broker cache seems to be broken for scenarios where there are multiple active cursors (/consumers). That happened in 2.8.2 / 2.9.0 . I have created an issue #16054 .
Motivation
The current implementation of the read cache in the Pulsar broker has largely
remained unchanged for a long time, except for a few minor tweaks.
While the implementation is stable and reasonably efficient for typical workloads,
the overhead required for managing the cache evictions in a broker that is running
many topics can be pretty high in terms of extra CPU utilization and on the JVM
garbage collection to track an increased number of medium-lived objects.
The goal is to provide an alternative implementation that can adapt better to
a wider variety of operating conditions.
Current implementation details
The broker cache is implemented as part of the
ManagedLedger
component,which sits in the Pulsar broker and provides a higher level of abstraction of top
of BookKeeper.
Each topic (and managed-ledger) has its own private cache space. This cache is implemented
as a
ConcurrentSkipList
sorted map that maps(ledgerId, entryId) -> payload
. The payloadis a
ByteBuf
reference that can either be a slice of aByteBuf
that we gotwhen reading from a socket, or it can be a copied buffer.
Each topic cache is allowed to use the full broker max cache size before an
eviction is triggered. The total cache size is effectively a resource shared across all
the topics, where a topic can use a more prominent portion of it if it "asks for more".
When the eviction happens, we need to do an expensive ranking of all the caches in the broker
and do an eviction in a proportional way to the currently used space for each of them.
The bigger problem is represented by the
ConcurrentSkipList
and theByteBuf
objectsthat need to be tracked. The skip list is essentially like a "tree" structure and needs to
maintain Java objects for each entry in the cache. We also need to potentially have
a huge number of ByteBuf objects.
A cache workload is typically the worst-case scenario for each garbage
collector implementation because it involves creating objects, storing them for some amount of
time and then throwing them away. During that time, the GC would have already tenured these
objects and copy them into an "old generation" space, and sometime later, a costly compaction
of that memory would have to be performed.
To mitigate the effect of the cache workload on the GC, we're being very aggressive in
purging the cache by triggering time-based eviction. By putting a max TTL on the elements in
the cache, we can avoid keeping the objects around for too long to be a problem for the GC.
The reverse side of this is that we're artificially reducing the cache capacity to a very
short time frame, reducing the cache usefulness.
The other problem is the CPU cost involved in doing these frequent evictions, which can
be very high when there are 10s of thousands of topics in a broker.
Proposed changes
Instead of dealing with individual caches for each topic, let's adopt a model where
there is a single cache space for the broker.
This cache is broken into N segments which act as a circular buffer. Whenever a segment
is full, we start writing into the next one, and when we reach the last one, we will
restart recycling the first segment.
This model has been working very well for the BookKeeper
ReadCache
:https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
The eviction becomes a completely trivial operation, buffers are just rotated and
overwritten. We don't need to do any per-topic task or keep track of utilization.
Today, there are 2 ways of configuring the cache, one that "copies" data into the cache
and another that will just use reference-counting on the original buffers to avoid
payload copies.
Memory copies into the cache
Each segment is composed of a buffer, an offset, and a hashmap which maps
(ledgerId, entryId) -> offset
.The advantage of this approach is that entries are copied into the cache buffer
(in direct memory), and we don't need to keep any long-lived Java objects around
Keeping reference-counted buffers in the cache
Each segment in the cache will contain a map
(ledgerId, entryId) -> ByteBuf
.Buffers will have an increase reference count that will keep the data alive as long
as the buffer is in the cache and it will be released when the cache segment is rotated.
The advantage is we avoid any memory copy when inserting into or reading from the cache.
The disadvantage is that we will have references to all the
ByteBuf
objects that are in the cache.API changes
No user-facing API changes are required.
New configuration options
The existing cache implementation will not be removed at this point. Users will
be able to configure the old implementation in
broker.conf
.This option will be helpful in case of performance regressions would be seen for
some use cases with the new cache implementation.
The text was updated successfully, but these errors were encountered: