Skip to content

Cache compressed cluster state size #39827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
package org.elasticsearch.action.admin.cluster.state;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand All @@ -34,24 +38,33 @@
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Predicate;

public class TransportClusterStateAction extends TransportMasterNodeReadAction<ClusterStateRequest, ClusterStateResponse> {

private static final Logger logger = LogManager.getLogger(TransportClusterStateAction.class);

private final ClusterStateSizeByVersionCache clusterStateSizeByVersionCache;

@Inject
public TransportClusterStateAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(ClusterStateAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
ClusterStateRequest::new, indexNameExpressionResolver);
clusterStateSizeByVersionCache = new ClusterStateSizeByVersionCache(threadPool);
}

@Override
Expand All @@ -65,7 +78,6 @@ protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterS
// cluster state calls are done also on a fully blocked cluster to figure out what is going
// on in the cluster. For example, which nodes have joined yet the recovery has not yet kicked
// in, we need to make sure we allow those calls
// return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
return null;
}

Expand All @@ -79,9 +91,8 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
final ActionListener<ClusterStateResponse> listener) throws IOException {

if (request.waitForMetaDataVersion() != null) {
final Predicate<ClusterState> metadataVersionPredicate = clusterState -> {
return clusterState.metaData().version() >= request.waitForMetaDataVersion();
};
final Predicate<ClusterState> metadataVersionPredicate
= clusterState -> clusterState.metaData().version() >= request.waitForMetaDataVersion();
final ClusterStateObserver observer =
new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext());
final ClusterState clusterState = observer.setAndGetObservedState();
Expand Down Expand Up @@ -121,7 +132,7 @@ public void onTimeout(TimeValue timeout) {

private void buildResponse(final ClusterStateRequest request,
final ClusterState currentState,
final ActionListener<ClusterStateResponse> listener) throws IOException {
final ActionListener<ClusterStateResponse> listener) {
logger.trace("Serving cluster state request using version {}", currentState.version());
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
Expand Down Expand Up @@ -182,9 +193,54 @@ private void buildResponse(final ClusterStateRequest request,
}
}
}
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(),
PublicationTransportHandler.serializeFullClusterState(currentState, Version.CURRENT).length(), false));

clusterStateSizeByVersionCache.getOrComputeCachedSize(currentState.version(),
() -> PublicationTransportHandler.serializeFullClusterState(currentState, Version.CURRENT).length(),
ActionListener.map(listener, size -> new ClusterStateResponse(currentState.getClusterName(), builder.build(), size, false)));
}

static class ClusterStateSizeByVersionCache {

// allow space for the sizes of the current and previous two cluster states to allow for out-of-order calls
static final int CACHE_SIZE = 3;

private final ThreadPool threadPool;

ClusterStateSizeByVersionCache(ThreadPool threadPool) {
this.threadPool = threadPool;
}

// The cluster state version might not completely determine its compressed size, because some changes (e.g. applying the no-master
// block) are made without updating its version. In practice it's close enough: most changes do update its version, and the
// size computed here is only used for reporting, so this is a reasonable approximation.
private Map<Long, PlainListenableActionFuture<Integer>> clusterStateSizeByVersionCache
= Collections.synchronizedMap(new LinkedHashMap<Long, PlainListenableActionFuture<Integer>>(CACHE_SIZE) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > CACHE_SIZE;
}
});

void getOrComputeCachedSize(final long clusterStateVersion,
final CheckedSupplier<Integer, IOException> sizeSupplier,
final ActionListener<Integer> listener) {
clusterStateSizeByVersionCache.computeIfAbsent(clusterStateVersion, v -> {
final PlainListenableActionFuture<Integer> future = PlainListenableActionFuture.newListenableFuture();
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug(
new ParameterizedMessage("failed to compute size of cluster state version {}", clusterStateVersion), e);
future.onFailure(e);
}

@Override
protected void doRun() throws IOException {
future.onResponse(sizeSupplier.get());
}
});
return future;
}).addListener(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public void testWaitForMetaDataVersion() throws Exception {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.waitForTimeout(TimeValue.timeValueHours(1));
ActionFuture<ClusterStateResponse> future1 = client().admin().cluster().state(clusterStateRequest);
assertThat(future1.isDone(), is(true));
assertThat(future1.actionGet().isWaitForTimedOut(), is(false));
long metadataVersion = future1.actionGet().getState().getMetaData().version();

Expand Down Expand Up @@ -64,9 +63,6 @@ public void testWaitForMetaDataVersion() throws Exception {
clusterStateRequest.waitForMetaDataVersion(metadataVersion + 1);
clusterStateRequest.waitForTimeout(TimeValue.timeValueMillis(500)); // Fail fast
ActionFuture<ClusterStateResponse> future3 = client().admin().cluster().state(clusterStateRequest);
assertBusy(() -> {
assertThat(future3.isDone(), is(true));
});
response = future3.actionGet();
assertThat(response.isWaitForTimedOut(), is(true));
assertThat(response.getState(), nullValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.state;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Assert;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class TransportClusterStateActionTests extends ESTestCase {

public void testClusterStateSizeByVersionCacheDoesNotDuplicateWork() throws Exception {

final int cacheSize = TransportClusterStateAction.ClusterStateSizeByVersionCache.CACHE_SIZE;
final List<AtomicInteger> callCounts
= IntStream.range(0, cacheSize).mapToObj(i -> new AtomicInteger()).collect(Collectors.toList());

final int threadCount = cacheSize * 3;

final List<PlainActionFuture<Integer>> resultFutures
= IntStream.range(0, threadCount).mapToObj(i -> new PlainActionFuture<Integer>()).collect(Collectors.toList());

final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount + 1);
final long seed = randomLong();

final String testExceptionMessage = "test IOException";

final ThreadPool threadPool = new TestThreadPool("test");
try {
final TransportClusterStateAction.ClusterStateSizeByVersionCache cache
= new TransportClusterStateAction.ClusterStateSizeByVersionCache(threadPool);

final int expiredVersion = cacheSize + 1;
{
// add an entry to the cache so we can check it was pushed out of the cache later
final int expiredSize = randomIntBetween(0, 100);
cache.getOrComputeCachedSize(expiredVersion, () -> expiredSize,
ActionListener.wrap(i -> assertThat(i, equalTo(expiredSize)), Assert::assertNotNull));
}

final List<Thread> threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(() -> {
try {
final Random random = new Random(seed + i);
final int clusterStateVersion = i % cacheSize;
final int size = random.nextInt();
cyclicBarrier.await();
cache.getOrComputeCachedSize(clusterStateVersion, () -> {
callCounts.get(clusterStateVersion).incrementAndGet();
if (rarely(random)) {
throw new IOException(testExceptionMessage);
} else {
return size;
}
}, resultFutures.get(i));

} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
})).collect(Collectors.toList());

threads.forEach(Thread::start);
cyclicBarrier.await();
for (Thread thread : threads) {
thread.join();
}

for (PlainActionFuture<Integer> resultFuture : resultFutures) {
try {
resultFuture.actionGet();
} catch (Exception e) {
final Throwable rootCause = new ElasticsearchException(e).getRootCause();
assertThat(e.toString(), rootCause, instanceOf(IOException.class));
assertThat(e.toString(), rootCause.getMessage(), equalTo(testExceptionMessage));
}
}

for (AtomicInteger callCount : callCounts) {
assertThat(callCount.get(), equalTo(1));
}

{
// check the expired entry was pushed out of the cache so requires recalculation
final int newExpiredSize = randomIntBetween(0, 100);
final PlainActionFuture<Integer> future = new PlainActionFuture<>();
cache.getOrComputeCachedSize(expiredVersion, () -> newExpiredSize, future);
assertThat(future.actionGet(), equalTo(newExpiredSize));
}

} finally {
threadPool.shutdown();
}
}
}