Skip to content

Commit

Permalink
Use bulk action interally for update action (#22915)
Browse files Browse the repository at this point in the history
Currently, update action internally uses deprecated index and delete
transport actions. As of #21964, these tranport actions were deprecated
in favour of using single item bulk request. In this commit, update action
uses single item bulk action.
  • Loading branch information
areek authored Feb 2, 2017
1 parent 7520a10 commit ba8ad39
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ protected WriteReplicaResult<Request> shardOperationOnReplica(
}


private ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
public static <Response extends ReplicationResponse & WriteResponse>
ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> {
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@

package org.elasticsearch.action.update;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportActions;
Expand Down Expand Up @@ -63,24 +61,23 @@
import java.util.Map;

import static org.elasticsearch.ExceptionsHelper.unwrapCause;
import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest;
import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.wrapBulkResponse;

public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {

private final TransportDeleteAction deleteAction;
private final TransportIndexAction indexAction;
private final TransportBulkAction bulkAction;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;

@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
TransportBulkAction bulkAction, TransportCreateIndexAction createIndexAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.bulkAction = bulkAction;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.indicesService = indicesService;
Expand Down Expand Up @@ -162,7 +159,7 @@ protected ShardIterator shards(ClusterState clusterState, UpdateRequest request)
return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard));
}
}
return new PlainShardIterator(shardIterator.shardId(), Collections.<ShardRouting>emptyList());
return new PlainShardIterator(shardIterator.shardId(), Collections.emptyList());
}

@Override
Expand All @@ -180,101 +177,46 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
} else {
update.setGetResult(null);
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
} else {
update.setGetResult(null);
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
);

break;
case UPDATED:
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
);
break;
case DELETED:
DeleteRequest deleteRequest = result.action();
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
);
break;
case NOOP:
UpdateResponse update = result.action();
Expand All @@ -291,4 +233,23 @@ protected void doRun() {
throw new IllegalStateException("Illegal result " + result.getResponseResult());
}
}

private void handleUpdateFailureWithRetry(final ActionListener<UpdateResponse> listener, final UpdateRequest request,
final Exception failure, int retryCount) {
final Throwable cause = unwrapCause(failure);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
}

0 comments on commit ba8ad39

Please sign in to comment.