-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Cache compressed cluster state size #39827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache compressed cluster state size #39827
Conversation
Today we compute the size of the compressed cluster state for each `cluster:monitor/state` action. We do so by serializing the whole cluster state and compressing it, and this happens on the network thread. This calculation can be rather expensive if the cluster state is large, and these actions can be rather frequent particularly if there are sniffing transport clients in use. Also the calculation is a simple function of the cluster state, so there is a lot of duplicated work here. This change introduces a small cache for this size computation to avoid all this duplicated work, and to avoid blocking network threads. Fixes elastic#39806.
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could be a little less blocking here, but otherwise LGTM :)
EDIT: seems the test failure in ci-1 could be related though?
|
||
clusterStateSizeByVersionCache.getOrComputeCachedSize(currentState.version(), | ||
() -> PublicationTransportHandler.serializeFullClusterState(currentState, Version.CURRENT).length(), | ||
ActionListener.wrap(size -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: you could use the new ActionListener#map
here to make this a little nicer :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Today is a day of learning :)
} | ||
}; | ||
|
||
synchronized void getOrComputeCachedSize(final long clusterStateVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe block a little less here, by simply using a Collections.synchronizedMap()
on the map (that'll be safe with computeIfAbsent
) since the addListener
call is thread-safe anyway and it would probably be nice to have it resolve outside a synchronized
block which currently it won't for cached values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.synchronizedMap()
doesn't let me bound the size of the map by overriding removeEldestEntry()
. But you're right about calling addListener
outside the mutex, I'll do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DaveCTurner you can just create the same map you're already creating and wrap it with java.util.Collections#synchronizedMap
? :) That said it doesn't really matter, it's just less code than having your own mutex handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right I was thinking of newConcurrentMap()
. Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we opting for a more intricate solution (forking to another thread pool, and a synchronized cache of futures) instead of merely forking the entirety of these requests over to the generic thread pool? The problem went from "we shouldn't do this on the network thread" to a solution that takes part of it off the network thread, and has an optimization built in. Are we convinced we need the latter part?
@jasontedor @DaveCTurner as far as I understand it there can be situations here were the request rate gets pretty high in large clusters (see the SDH attached to the issue), that's why I figured this may be worth it. I was thinking the same though: #39806 (comment) ... but not sure if we can do a reasonable benchmark to decide. |
The hot threads output from the support case that triggered this shows that almost every node is using 30%+ of a CPU just calculating the size of the compressed cluster state, with some of them using 300%+. If we simply forked onto the generic threadpool we would avoid doing this calculation on the network thread but we'd still be repeating all that work for each call. |
@DaveCTurner (not wanting to put words in Jason's mouth) I think the contention isn't so much with the caching per se, but rather with the complexity of resolving the listeners on the transport thread if there's a cached value and forking to the generic thread-pool if there isn't manually. |
I also want to push back and question whether or not we really need to be reporting this. That is, I want to revisit #3415 and wonder if we can we get away with removing this altogether? |
@jasontedor I agree that we should simply stop reporting this in 8.0. However I think we shouldn't introduce that breaking change into 7.0, but I don't hold that opinion very tightly so if you think that's ok then I can do that instead. |
Dismissing this to indicate that we're still discussing if this is the right approach.
I'm okay with deprecating in 6.7 and breaking in 7.0. |
@DaveCTurner @jasontedor I was using |
The proposal is that we remove this indeed, to me it has questionable value. Cluster states are tens to hundreds of megabytes, and typically anything beyond that is a sign that something is wrong. I don't think we need reporting within the API for the compressed size of a single file given that its size doesn't vary too wildly, and its uncompressed size can be read off disk. If we really need API reporting here, I would favor that we report the uncompressed size on disk, but I question the value of that too. |
Closing in favour of #39951. |
Today we compute the size of the compressed cluster state for each
cluster:monitor/state
action. We do so by serializing the whole cluster stateand compressing it, and this happens on the network thread. This calculation
can be rather expensive if the cluster state is large, and these actions can be
rather frequent particularly if there are sniffing transport clients in use.
Also the calculation is a simple function of the cluster state, so there is
a lot of duplicated work here.
This change introduces a small cache for this size computation to avoid all
this duplicated work, and to avoid blocking network threads.
Fixes #39806.