Skip to content

Rework shutdown no-op tests to avoid mocks #86236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -129,6 +131,68 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove
return clusterService;
}

public static ClusterService createSingleThreadedClusterService() {
final var threadPool = new DeterministicTaskQueue().getThreadPool();
final var clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final var localNode = new DiscoveryNode(
"node",
ESTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(),
DiscoveryNodeRole.roles(),
Version.CURRENT
);
// run tasks inline, no need to fork to a separate thread
final var directExecutor = new PrioritizedEsThreadPoolExecutor(
"direct",
1,
1,
1,
TimeUnit.SECONDS,
r -> { throw new AssertionError("should not create new threads"); },
null,
null,
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER
) {
@Override
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
command.run();
}

@Override
public void execute(Runnable command) {
command.run();
}
};

final var clusterApplierService = new ClusterApplierService("test", Settings.EMPTY, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return directExecutor;
}
};
clusterApplierService.setInitialState(
ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build()
);

final var masterService = new MasterService(Settings.EMPTY, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return directExecutor;
}
};
masterService.setClusterStatePublisher(createClusterStatePublisher(clusterApplierService));
masterService.setClusterStateSupplier(clusterApplierService::state);

final var clusterService = new ClusterService(Settings.EMPTY, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
clusterService.setRerouteService((s, p, r) -> r.onResponse(clusterApplierService.state()));
clusterService.start();
return clusterService;
}

public static NodeConnectionsService createNoOpNodeConnectionsService() {
return new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,55 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskContext;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.shutdown.TransportDeleteShutdownNodeAction.DeleteShutdownNodeExecutor;
import org.elasticsearch.xpack.shutdown.TransportDeleteShutdownNodeAction.DeleteShutdownNodeTask;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.TYPE;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TransportDeleteShutdownNodeActionTests extends ESTestCase {
private ClusterService clusterService;
private TransportDeleteShutdownNodeAction action;

// must use member mock for generic
@Mock
private TaskContext<DeleteShutdownNodeTask> taskContext;

@Before
public void init() {
MockitoAnnotations.openMocks(this);
// TODO: it takes almost 2 seconds to create these mocks....WHY?!?
var threadPool = mock(ThreadPool.class);
var transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
var actionFilters = mock(ActionFilters.class);
var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
action = new TransportDeleteShutdownNodeAction(
transportService,
public void testNoop() throws Exception {
final var clusterService = ClusterServiceUtils.createSingleThreadedClusterService();
final var appliedStateCount = new AtomicInteger();
clusterService.addListener(event -> appliedStateCount.incrementAndGet());
final var action = new TransportDeleteShutdownNodeAction(
mock(TransportService.class),
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
clusterService.getClusterApplierService().threadPool(),
new ActionFilters(Collections.emptySet()),
TestIndexNameExpressionResolver.newInstance()
);
}

public void testNoop() throws Exception {
var singleNodeMetadata = mock(SingleNodeShutdownMetadata.class);
var nodesShutdownMetadata = new NodesShutdownMetadata(Map.of("node1", singleNodeMetadata));
var metadata = Metadata.builder().putCustom(TYPE, nodesShutdownMetadata).build();
var clusterStateWithShutdown = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();

var request = new DeleteShutdownNodeAction.Request("node1");
action.masterOperation(null, request, clusterStateWithShutdown, ActionListener.noop());
var updateTask = ArgumentCaptor.forClass(DeleteShutdownNodeTask.class);
var taskConfig = ArgumentCaptor.forClass(ClusterStateTaskConfig.class);
var taskExecutor = ArgumentCaptor.forClass(DeleteShutdownNodeExecutor.class);
verify(clusterService).submitStateUpdateTask(any(), updateTask.capture(), taskConfig.capture(), taskExecutor.capture());
when(taskContext.getTask()).thenReturn(updateTask.getValue());
ClusterState gotState = taskExecutor.getValue().execute(ClusterState.EMPTY_STATE, List.of(taskContext));
assertThat(gotState, sameInstance(ClusterState.EMPTY_STATE));
clusterService.getClusterApplierService()
.onNewClusterState(
"setup",
() -> clusterService.state()
.copyAndUpdateMetadata(
mb -> mb.putCustom(TYPE, new NodesShutdownMetadata(Map.of("node1", mock(SingleNodeShutdownMetadata.class))))
),
ActionListener.noop()
);
assertThat(appliedStateCount.get(), equalTo(1));
final var initialState = clusterService.state();
final var request = new DeleteShutdownNodeAction.Request("node1");

// run the action to remove the shutdown entry
action.masterOperation(null, request, initialState, ActionListener.noop());
assertThat(appliedStateCount.get(), equalTo(2));
final var removedState = clusterService.state();

// run the action again (with stale state) and observe that no further state is published
action.masterOperation(null, request, initialState, ActionListener.noop());
assertThat(appliedStateCount.get(), equalTo(2));
assertSame(removedState, clusterService.state());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,83 +9,54 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskContext;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.shutdown.TransportPutShutdownNodeAction.PutShutdownNodeExecutor;
import org.elasticsearch.xpack.shutdown.TransportPutShutdownNodeAction.PutShutdownNodeTask;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.List;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class TransportPutShutdownNodeActionTests extends ESTestCase {

private ClusterService clusterService;
private TransportPutShutdownNodeAction action;

// must use member mock for generic
@Mock
private TaskContext<PutShutdownNodeTask> taskContext;

@Before
public void init() {
MockitoAnnotations.openMocks(this);
// TODO: it takes almost 2 seconds to create these mocks....WHY?!?
var threadPool = mock(ThreadPool.class);
var transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
var actionFilters = mock(ActionFilters.class);
var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
action = new TransportPutShutdownNodeAction(
transportService,
public void testNoop() {
final var clusterService = ClusterServiceUtils.createSingleThreadedClusterService();
final var appliedStateCount = new AtomicInteger();
clusterService.addListener(event -> appliedStateCount.incrementAndGet());
final var action = new TransportPutShutdownNodeAction(
mock(TransportService.class),
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
clusterService.getClusterApplierService().threadPool(),
new ActionFilters(Collections.emptySet()),
TestIndexNameExpressionResolver.newInstance()
);
}

public void testNoop() throws Exception {
var type = randomFrom(Type.REMOVE, Type.REPLACE, Type.RESTART);
var allocationDelay = type == Type.RESTART ? TimeValue.timeValueMinutes(randomIntBetween(1, 3)) : null;
var targetNodeName = type == Type.REPLACE ? randomAlphaOfLength(5) : null;
var request = new PutShutdownNodeAction.Request("node1", type, "sunsetting", allocationDelay, targetNodeName);
action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop());
var updateTask = ArgumentCaptor.forClass(PutShutdownNodeTask.class);
var taskConfig = ArgumentCaptor.forClass(ClusterStateTaskConfig.class);
var taskExecutor = ArgumentCaptor.forClass(PutShutdownNodeExecutor.class);
verify(clusterService).submitStateUpdateTask(any(), updateTask.capture(), taskConfig.capture(), taskExecutor.capture());
when(taskContext.getTask()).thenReturn(updateTask.getValue());
ClusterState stableState = taskExecutor.getValue().execute(ClusterState.EMPTY_STATE, List.of(taskContext));

// run the request again, there should be no call to submit an update task
clearInvocations(clusterService);
final var initialState = clusterService.state();
final var type = randomFrom(Type.REMOVE, Type.REPLACE, Type.RESTART);
final var allocationDelay = type == Type.RESTART ? TimeValue.timeValueMinutes(randomIntBetween(1, 3)) : null;
final var targetNodeName = type == Type.REPLACE ? randomAlphaOfLength(5) : null;
final var request = new PutShutdownNodeAction.Request("node1", type, "sunsetting", allocationDelay, targetNodeName);

// run the request against the initial state - the master service should compute and apply the update
action.masterOperation(null, request, initialState, ActionListener.noop());
assertThat(appliedStateCount.get(), equalTo(1));
var stableState = clusterService.state();
assertNotSame(initialState, stableState);

// run the request again with stale state to bypass the action's noop detection - the master service should compute an update based
// on the last-applied state, determine it's a no-op, and skip publication
action.masterOperation(null, request, initialState, ActionListener.noop());
assertThat(appliedStateCount.get(), equalTo(1));
assertSame(stableState, clusterService.state());

// run the request again but with the latest state - the action should not even submit a task to the master
clusterService.getMasterService().setClusterStateSupplier(() -> { throw new AssertionError("should not submit task"); });
action.masterOperation(null, request, stableState, ActionListener.noop());
verifyNoInteractions(clusterService);

// run the request again with empty state, the update task should return the same state
action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop());
verify(clusterService).submitStateUpdateTask(any(), updateTask.capture(), taskConfig.capture(), taskExecutor.capture());
when(taskContext.getTask()).thenReturn(updateTask.getValue());
ClusterState gotState = taskExecutor.getValue().execute(stableState, List.of(taskContext));
assertThat(gotState, sameInstance(stableState));
assertThat(appliedStateCount.get(), equalTo(1));
assertSame(stableState, clusterService.state());
}
}