Skip to content

Commit

Permalink
Add bwc for index/delete requests from pre-6.0 nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
areek committed Dec 8, 2016
1 parent c5b09ad commit 4231aa4
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
Expand Down Expand Up @@ -105,7 +109,8 @@ protected boolean resolveIndex() {
}

@Override
protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) throws Exception {
protected WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();

long[] preVersions = new long[request.items().length];
Expand All @@ -121,7 +126,7 @@ protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, I
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WritePrimaryResult(request, response, location, null, primary);
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
}

/** Executes bulk item requests and handles request execution exceptions */
Expand Down Expand Up @@ -355,7 +360,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
}

@Override
protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
protected WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
Expand Down Expand Up @@ -399,7 +404,7 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
}
}
}
return new WriteReplicaResult(request, location, null, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
Expand All @@ -413,6 +418,42 @@ private Translog.Location locationToSync(Translog.Location current, Translog.Loc
return next;
}

public <Request extends ReplicatedWriteRequest<Request>, Response extends ReplicationResponse & WriteResponse>
WritePrimaryResult<Request, Response> executeSingleItemBulkRequestOnPrimary(
Request request, IndexShard primary) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result = shardOperationOnPrimary(bulkShardRequest, primary);
BulkShardResponse bulkShardResponse = result.finalResponseIfSuccessful;
assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new WritePrimaryResult<>(request, response, result.location, failure, primary, logger);
}

public <ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
WriteReplicaResult<ReplicaRequest> executeSingleItemBulkRequestOnReplica(
ReplicaRequest request, IndexShard replica) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardOperationOnReplica(bulkShardRequest, replica);
return new WriteReplicaResult<>(request, result.location, null, replica, logger);
}

/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,38 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends HandledTransportAction<DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> {

private final TransportBulkAction bulkAction;
private final TransportShardBulkAction shardBulkAction;

@Inject
public TransportDeleteAction(Settings settings, TransportService transportService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction) {
super(settings, DeleteAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeleteRequest::new);
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.bulkAction = bulkAction;
this.shardBulkAction = shardBulkAction;
}

@Override
Expand Down Expand Up @@ -80,7 +87,19 @@ public void onFailure(Exception e) {
}

@Override
protected void doExecute(DeleteRequest request, ActionListener<DeleteResponse> listener) {
throw new UnsupportedOperationException("must have task with request");
protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
}

@Override
protected WritePrimaryResult<DeleteRequest, DeleteResponse> shardOperationOnPrimary(
DeleteRequest request, IndexShard primary) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary);
}

@Override
protected WriteReplicaResult<DeleteRequest> shardOperationOnReplica(
DeleteRequest request, IndexShard replica) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -44,16 +49,21 @@
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends HandledTransportAction<IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {

private final TransportBulkAction bulkAction;
private final TransportShardBulkAction shardBulkAction;

@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ThreadPool threadPool,
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction) {
super(settings, IndexAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, IndexRequest::new);
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.bulkAction = bulkAction;
this.shardBulkAction = shardBulkAction;
}

@Override
Expand Down Expand Up @@ -85,8 +95,20 @@ public void onFailure(Exception e) {
}

@Override
protected void doExecute(IndexRequest request, ActionListener<IndexResponse> listener) {
throw new UnsupportedOperationException("must have task with request");
protected IndexResponse newResponseInstance() {
return new IndexResponse();
}

@Override
protected WritePrimaryResult<IndexRequest, IndexResponse> shardOperationOnPrimary(
IndexRequest request, IndexShard primary) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary);
}

@Override
protected WriteReplicaResult<IndexRequest> shardOperationOnReplica(
IndexRequest request, IndexShard replica) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re
* @param shardRequest the request to the primary shard
* @param primary the primary shard to perform the operation on
*/
protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception;
protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
Request shardRequest, IndexShard primary) throws Exception;

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
Expand Down Expand Up @@ -359,19 +360,21 @@ public void onFailure(Exception e) {
};
}

protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult> createReplicatedOperation(
Request request, ActionListener<PrimaryResult> listener,
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
);
}
}

protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
protected static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse>
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest;
final Response finalResponseIfSuccessful;
final Exception finalFailure;
public final Response finalResponseIfSuccessful;
public final Exception finalFailure;

/**
* Result of executing a primary operation
Expand Down Expand Up @@ -411,7 +414,7 @@ public void respond(ActionListener<Response> listener) {
}
}

protected class ReplicaResult {
protected static class ReplicaResult {
final Exception finalFailure;

public ReplicaResult(Exception finalFailure) {
Expand Down Expand Up @@ -927,7 +930,8 @@ public ShardRouting routingEntry() {

}

class PrimaryShardReference extends ShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult> {
class PrimaryShardReference extends ShardReference
implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {

PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
super(indexShard, operationLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
* async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
*/
@Override
protected abstract WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception;
protected abstract WritePrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
Request request, IndexShard primary) throws Exception;

/**
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
Expand All @@ -76,19 +77,24 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
* async refresh is performed on the <code>replica</code> shard according to the <code>ReplicaRequest</code> refresh policy
*/
@Override
protected abstract WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception;
protected abstract WriteReplicaResult<ReplicaRequest> shardOperationOnReplica(
ReplicaRequest request, IndexShard replica) throws Exception;

/**
* Result of taking the action on the primary.
*/
protected class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult {
protected static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
implements RespondingWriteResult {
boolean finishedAsyncActions;
public final Location location;
ActionListener<Response> listener = null;

public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary) {
IndexShard primary, Logger logger) {
super(request, finalResponse, operationFailure);
this.location = location;
assert location == null || operationFailure == null
: "expected either failure to be null or translog location to be null, " +
"but found: [" + location + "] translog location and [" + operationFailure + "] failure";
Expand Down Expand Up @@ -138,13 +144,16 @@ public synchronized void onSuccess(boolean forcedRefresh) {
/**
* Result of taking the action on the replica.
*/
protected class WriteReplicaResult extends ReplicaResult implements RespondingWriteResult {
protected static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
extends ReplicaResult implements RespondingWriteResult {
public final Location location;
boolean finishedAsyncActions;
private ActionListener<TransportResponse.Empty> listener;

public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
@Nullable Exception operationFailure, IndexShard replica) {
@Nullable Exception operationFailure, IndexShard replica, Logger logger) {
super(operationFailure);
this.location = location;
if (operationFailure != null) {
this.finishedAsyncActions = true;
} else {
Expand Down
Loading

0 comments on commit 4231aa4

Please sign in to comment.