Skip to content

New TransportBroadcastUnpromotableAction action #93600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
022408c
New TransportBroadcastUnpromotableAction action
kingherc Feb 7, 2023
90c08c0
Update docs/changelog/93600.yaml
kingherc Feb 8, 2023
0f29dea
Fix style
kingherc Feb 8, 2023
295670c
Fix PR comments
kingherc Feb 9, 2023
16466cc
Remove ActionType
kingherc Feb 9, 2023
035443a
Fix style
kingherc Feb 9, 2023
19f3fe2
Rename action
kingherc Feb 9, 2023
924e3d9
Fix RBACEngine checks
kingherc Feb 9, 2023
a88a90c
Pass IndexShardRoutingTable to action
kingherc Feb 9, 2023
ea98faa
Merge remote-tracking branch 'main' into feature/ES-5454-action-broad…
kingherc Feb 10, 2023
e3c9d6d
Fix checkstyle
kingherc Feb 10, 2023
a5a4572
Fix leftover TODOs
kingherc Feb 10, 2023
8f2c866
Fix test
kingherc Feb 10, 2023
a23edcb
Rename primaryShardId to shardId
kingherc Feb 10, 2023
6ed6de6
Make refresh result part of the replica request
kingherc Feb 13, 2023
f8de888
Do not fork again
kingherc Feb 13, 2023
fa3b88d
Merge remote-tracking branch 'main' into feature/ES-5454-action-broad…
kingherc Feb 13, 2023
5a920e4
Fix merge conflict
kingherc Feb 13, 2023
e276295
Merge remote-tracking branch 'main' into feature/ES-5454-action-broad…
kingherc Feb 13, 2023
f802a6c
Fail if primary customization logic fails
kingherc Feb 13, 2023
bc535c3
Register new exception
kingherc Feb 13, 2023
0ce144d
Add new exception to test
kingherc Feb 14, 2023
cfadcd7
Fail unpromotable like replica shard failures
kingherc Feb 14, 2023
9599493
Null node throws NodeNotConnectedException
kingherc Feb 14, 2023
99c2fe1
Revert changing private fields in routing class
kingherc Feb 14, 2023
86cfd68
Small PR comment
kingherc Feb 14, 2023
2b9fb3f
Broadcast request implements IndicesRequest
kingherc Feb 14, 2023
d1c3f10
Fix checkstyle
kingherc Feb 14, 2023
4d1598a
Small serialization optimization
kingherc Feb 14, 2023
0146161
Merge remote-tracking branch 'main' into feature/ES-5454-action-broad…
kingherc Feb 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93600.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93600
summary: New `TransportBroadcastUnpromotableAction` action
area: CRUD
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -269,7 +271,7 @@ private static void installMockTransportVerifications(RoutingTableWatcher routin
connection.sendRequest(requestId, action, request, options);
});
mockTransportService.addRequestHandlingBehavior(
TransportUnpromotableShardRefreshAction.NAME,
TransportUnpromotableShardRefreshAction.NAME + "[u]",
(handler, request, channel, task) -> {
// Skip handling the request and send an immediate empty response
channel.sendResponse(ActionResponse.Empty.INSTANCE);
Expand Down Expand Up @@ -690,6 +692,63 @@ public void testRefreshOfUnpromotableShards() throws Exception {
}
}

public void testRefreshFailsIfUnpromotableDisconnects() throws Exception {
var routingTableWatcher = new RoutingTableWatcher();
var additionalNumberOfNodesWithUnpromotableShards = 1;
routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies + additionalNumberOfNodesWithUnpromotableShards - 1;
internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numIndexingCopies + 1);
final String nodeWithUnpromotableOnly = internalCluster().startDataOnlyNode(
Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build()
);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
try {
// verify the correct number of shard copies of each role as the routing table evolves
masterClusterService.addListener(routingTableWatcher);

createIndex(
INDEX_NAME,
Settings.builder()
.put(routingTableWatcher.getIndexSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build()
);
ensureGreen(INDEX_NAME);
assertEngineTypes();

indexRandom(false, INDEX_NAME, randomIntBetween(1, 10));

for (var transportService : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(TransportUnpromotableShardRefreshAction.NAME + "[u]")
&& nodeWithUnpromotableOnly.equals(connection.getNode().getName())) {
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
connection.sendRequest(requestId, action, request, options);
});
}

RefreshResponse response = client().admin().indices().prepareRefresh(INDEX_NAME).execute().actionGet();
assertThat(
"each unpromotable replica shard should be added to the shard failures",
response.getFailedShards(),
equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)
);
assertThat(
"the total shards is incremented with the unpromotable shard failures",
response.getTotalShards(),
equalTo(response.getSuccessfulShards() + response.getFailedShards())
);
} finally {
masterClusterService.removeListener(routingTableWatcher);
}
}

public void testNodesWithUnpromotableShardsNeverGetReplicationActions() throws Exception {
var routingTableWatcher = new RoutingTableWatcher();
var additionalNumberOfNodesWithUnpromotableShards = randomIntBetween(1, 3);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
exports org.elasticsearch.action.support;
exports org.elasticsearch.action.support.broadcast;
exports org.elasticsearch.action.support.broadcast.node;
exports org.elasticsearch.action.support.broadcast.unpromotable;
exports org.elasticsearch.action.support.master;
exports org.elasticsearch.action.support.master.info;
exports org.elasticsearch.action.support.nodes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

/**
* A request that is sent to the promotable replicas of a primary shard
*/
public class ShardRefreshReplicaRequest extends ReplicationRequest<ShardRefreshReplicaRequest> {

/**
* Holds the refresh result of the primary shard. This will be used by {@link TransportShardRefreshAction} to construct a
* {@link UnpromotableShardRefreshRequest} to broadcast to the unpromotable replicas. The refresh result is not serialized to maintain
* backwards compatibility for the refresh requests to promotable replicas which do not need the refresh result. For this reason, the
* field is package-private.
*/
final Engine.RefreshResult primaryRefreshResult;

public ShardRefreshReplicaRequest(StreamInput in) throws IOException {
super(in);
primaryRefreshResult = Engine.RefreshResult.NO_REFRESH;
}

public ShardRefreshReplicaRequest(ShardId shardId, Engine.RefreshResult primaryRefreshResult) {
super(shardId);
this.primaryRefreshResult = primaryRefreshResult;
}

@Override
public String toString() {
return "ShardRefreshReplicaRequest{" + shardId + "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -28,19 +27,14 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class TransportShardRefreshAction extends TransportReplicationAction<
BasicReplicationRequest,
BasicReplicationRequest,
ShardRefreshReplicaRequest,
ReplicationResponse> {

private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);
Expand Down Expand Up @@ -69,10 +63,11 @@ public TransportShardRefreshAction(
shardStateAction,
actionFilters,
BasicReplicationRequest::new,
BasicReplicationRequest::new,
ShardRefreshReplicaRequest::new,
ThreadPool.Names.REFRESH
);
new TransportUnpromotableShardRefreshAction(transportService, actionFilters, indicesService);
// registers the unpromotable version of shard refresh action
new TransportUnpromotableShardRefreshAction(clusterService, transportService, actionFilters, indicesService);
}

@Override
Expand All @@ -84,53 +79,53 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep
protected void shardOperationOnPrimary(
BasicReplicationRequest shardRequest,
IndexShard primary,
ActionListener<PrimaryResult<BasicReplicationRequest, ReplicationResponse>> listener
ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
) {
try (var listeners = new RefCountingListener(listener.map(v -> new PrimaryResult<>(shardRequest, new ReplicationResponse())))) {
var refreshResult = primary.refresh(SOURCE_API);
ActionListener.completeWith(listener, () -> {
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), primary.refresh(SOURCE_API));
replicaRequest.setParentTask(shardRequest.getParentTask());
logger.trace("{} refresh request executed on primary", primary.shardId());

// Forward the request to all nodes that hold unpromotable replica shards
final ClusterState clusterState = clusterService.state();
final Task parentTaskId = taskManager.getTask(shardRequest.getParentTask().getId());
clusterState.routingTable()
.shardRoutingTable(shardRequest.shardId())
.assignedShards()
.stream()
.filter(Predicate.not(ShardRouting::isPromotableToPrimary))
.map(ShardRouting::currentNodeId)
.collect(Collectors.toUnmodifiableSet())
.forEach(nodeId -> {
final DiscoveryNode node = clusterState.nodes().get(nodeId);
UnpromotableShardRefreshRequest request = new UnpromotableShardRefreshRequest(
primary.shardId(),
refreshResult.generation()
);
logger.trace("forwarding refresh request [{}] to node [{}]", request, node);
transportService.sendChildRequest(
node,
TransportUnpromotableShardRefreshAction.NAME,
request,
parentTaskId,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
listeners.acquire(ignored -> {}),
(in) -> TransportResponse.Empty.INSTANCE,
ThreadPool.Names.REFRESH
)
);
});
} catch (Exception e) {
listener.onFailure(e);
}
return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
});
}

@Override
protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.refresh(SOURCE_API);
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
});
}

@Override
protected ReplicationOperation.Replicas<ShardRefreshReplicaRequest> newReplicasProxy() {
return new UnpromotableReplicasRefreshProxy();
}

protected class UnpromotableReplicasRefreshProxy extends ReplicasProxy {

@Override
public void onPrimaryOperationComplete(
ShardRefreshReplicaRequest replicaRequest,
IndexShardRoutingTable indexShardRoutingTable,
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.generation()
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(
listener.delegateFailure((l, r) -> l.onResponse(null)),
(in) -> ActionResponse.Empty.INSTANCE,
ThreadPool.Names.REFRESH
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,42 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.broadcast.unpromotable.TransportBroadcastUnpromotableAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class TransportUnpromotableShardRefreshAction extends HandledTransportAction<UnpromotableShardRefreshRequest, ActionResponse.Empty> {
public static final String NAME = RefreshAction.NAME + "[u]";
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<UnpromotableShardRefreshRequest> {

public static final String NAME = RefreshAction.NAME + "/unpromotable";

private final IndicesService indicesService;

@Inject
public TransportUnpromotableShardRefreshAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndicesService indicesService
) {
super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH);
super(NAME, clusterService, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH);
this.indicesService = indicesService;
}

@Override
protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener<ActionResponse.Empty> responseListener) {
protected void unpromotableShardOperation(
Task task,
UnpromotableShardRefreshRequest request,
ActionListener<ActionResponse.Empty> responseListener
) {
ActionListener.run(responseListener, listener -> {
assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION
: "The request segment is " + request.getSegmentGeneration();
IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id());
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE));
});
}

}
Loading