Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix gRPC transport SETTING_GRPC_MAX_MSG_SIZE setting not exposed to users ([#18910](https://github.com/opensearch-project/OpenSearch/pull/18910))
- Reset isPipelineResolved to false to resolve the system ingest pipeline again. ([#18911](https://github.com/opensearch-project/OpenSearch/pull/18911))
- Bug fix for `scaled_float` in `encodePoint` method ([#18952](https://github.com/opensearch-project/OpenSearch/pull/18952))
- Refactor TransportUpdateAction to use TransportShardBulkAction directly for improved performance and elimination of unnecessary network hops ([#15264](https://github.com/opensearch-project/OpenSearch/issues/15264))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
import org.opensearch.action.RoutingMissingException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkItemRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.bulk.BulkShardResponse;
import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -84,8 +89,6 @@
import java.util.Map;

import static org.opensearch.ExceptionsHelper.unwrapCause;
import static org.opensearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest;
import static org.opensearch.action.bulk.TransportSingleItemBulkWriteAction.wrapBulkResponse;

/**
* Transport action for updating an index
Expand All @@ -99,6 +102,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
private final IndicesService indicesService;
private final NodeClient client;
private final ClusterService clusterService;
private final TransportShardBulkAction shardBulkAction;

@Inject
public TransportUpdateAction(
Expand All @@ -110,7 +114,8 @@ public TransportUpdateAction(
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService,
AutoCreateIndex autoCreateIndex,
NodeClient client
NodeClient client,
TransportShardBulkAction shardBulkAction
) {
super(
UpdateAction.NAME,
Expand All @@ -126,6 +131,7 @@ public TransportUpdateAction(
this.autoCreateIndex = autoCreateIndex;
this.client = client;
this.clusterService = clusterService;
this.shardBulkAction = shardBulkAction;
}

@Override
Expand Down Expand Up @@ -229,53 +235,88 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
shardOperation(request, listener, 0);
}

private void executeBulkItemOnShard(
DocWriteRequest<?> docWriteRequest,
UpdateRequest request,
ShardId shardId,
ActionListener<BulkShardResponse> listener
) {
BulkItemRequest bulkItemRequest = new BulkItemRequest(0, docWriteRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
request.getRefreshPolicy(),
new BulkItemRequest[] { bulkItemRequest }
);
bulkShardRequest.timeout(request.timeout());
bulkShardRequest.setParentTask(request.getParentTask());
shardBulkAction.execute(bulkShardRequest, listener);
}

protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) {
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
case CREATED: {
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
Tuple<? extends MediaType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
upsertSourceBytes,
true,
upsertRequest.getContentType()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
sourceAndContent.v2(),
sourceAndContent.v1(),
upsertSourceBytes
)

executeBulkItemOnShard(upsertRequest, request, shardId, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkResponse) {
BulkItemResponse itemResponse = bulkResponse.getResponses()[0];
if (itemResponse.isFailed()) {
handleUpdateFailureWithRetry(listener, request, itemResponse.getFailure().getCause(), retryCount);
return;
}

IndexResponse response = (IndexResponse) itemResponse.getResponse();
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
} else {
update.setGetResult(null);

if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
Tuple<? extends MediaType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
upsertSourceBytes,
true,
upsertRequest.getContentType()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
sourceAndContent.v2(),
sourceAndContent.v1(),
upsertSourceBytes
)
);
} else {
update.setGetResult(null);
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
handleUpdateFailureWithRetry(listener, request, e, retryCount);
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
});

break;
case UPDATED:
}
case UPDATED: {
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
Expand All @@ -288,60 +329,94 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
+ "] has a default ingest pipeline or a final ingest pipeline, the support of the ingest pipelines for update operation causes unexpected result and will be removed in 3.0.0"
);
}
client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we use bulk today, ingest pipeline are getting applied when users rely on _update API for the documents which are generated here as part of the indexing request. With this change, we will not be able to leverage the same as we are not going through the TransportBulkAction anymore.

This may end up to be a breaking change for users if we directly delegate to bulk shard action and not apply the operations within the ingest pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I don't think _bulk API applies ingest pipeline on update operations either, so this change may end up unifying the behavior for both bulk/update APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is a breaking change in terms of expectations for users, I don't think it would be ok to do this change, unless we enable support for ingest pipeline here. @shwetathareja thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we enable ingest here, would it not defeat the optimisation that this change looks to do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Steps to see the change in final doc:

PUT _ingest/pipeline/mypipeline
{
    "description": "This pipeline processes student data",
    "processors": [
        {
            "set": {
                "description": "Sets the graduation year to 2023",
                "field": "grad_year",
                "value": 2023
            }
        },
        {
            "set": {
                "description": "Sets graduated to true",
                "field": "graduated",
                "value": true
            }
        },
        {
            "uppercase": {
                "field": "name"
            }
        }
    ]
}
PUT index
{
    "settings": {
        "index.default_pipeline": "mypipeline",
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}
PUT index/_doc/1
{
    "name": "Clark"
}
POST index/_update/1
{
  "doc": {
    "name" : "Bruce",
    "last_name" : "Wayne"
  }
}
GET index/_doc/1

Final o/p without this change:
{
    "_index": "index",
    "_id": "1",
    "_version": 2,
    "_seq_no": 1,
    "_primary_term": 1,
    "found": true,
    "_source": {
        "graduated": true,
        "name": "BRUCE", <-- ingest pipeline applied to change the field's value to upper case
        "last_name": "Wayne",
        "grad_year": 2023
    }
}
Final o/p with this change:
{
    "_index": "index",
    "_id": "1",
    "_version": 2,
    "_seq_no": 1,
    "_primary_term": 1,
    "found": true,
    "_source": {
        "graduated": true,
        "name": "Bruce",  <-- see the original input case being persisted due to no ingest pipeline triggered now
        "grad_year": 2023,
        "last_name": "Wayne"
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i dont think we can have breaking change as users might be using this as a feature to process using ingest pipeline.

UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),

executeBulkItemOnShard(indexRequest, request, shardId, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkResponse) {
BulkItemResponse itemResponse = bulkResponse.getResponses()[0];
if (itemResponse.isFailed()) {
handleUpdateFailureWithRetry(listener, request, itemResponse.getFailure().getCause(), retryCount);
return;
}

IndexResponse response = (IndexResponse) itemResponse.getResponse();
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
indexSourceBytes
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
indexSourceBytes
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
handleUpdateFailureWithRetry(listener, request, e, retryCount);
}
});
break;
case DELETED:
}
case DELETED: {
DeleteRequest deleteRequest = result.action();
client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),

executeBulkItemOnShard(deleteRequest, request, shardId, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkResponse) {
BulkItemResponse itemResponse = bulkResponse.getResponses()[0];
if (itemResponse.isFailed()) {
handleUpdateFailureWithRetry(listener, request, itemResponse.getFailure().getCause(), retryCount);
return;
}

DeleteResponse response = (DeleteResponse) itemResponse.getResponse();
UpdateResponse update = new UpdateResponse(
response.getShardInfo(),
response.getShardId(),
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
null
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
response.getResult()
);
update.setGetResult(
UpdateHelper.extractGetResult(
request,
request.concreteIndex(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getVersion(),
result.updatedSourceAsMap(),
result.updateSourceContentType(),
null
)
);
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}

@Override
public void onFailure(Exception e) {
handleUpdateFailureWithRetry(listener, request, e, retryCount);
}
});
break;
}
case NOOP:
UpdateResponse update = result.action();
IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.update;

import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.node.NodeClient;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportUpdateActionTests extends OpenSearchTestCase {

/**
* Test that TransportUpdateAction uses TransportShardBulkAction directly.
* We can't use reflection due to forbidden APIs, so we test the behavior
* by verifying that the shardBulkAction is called during update operations.
*/
public void testUpdateActionUsesShardBulkActionDirectly() {
// Mock dependencies
ThreadPool threadPool = mock(ThreadPool.class);
ClusterService clusterService = mock(ClusterService.class);
TransportService transportService = mock(TransportService.class);
UpdateHelper updateHelper = mock(UpdateHelper.class);
ActionFilters actionFilters = mock(ActionFilters.class);
IndexNameExpressionResolver resolver = mock(IndexNameExpressionResolver.class);
IndicesService indicesService = mock(IndicesService.class);
NodeClient client = mock(NodeClient.class);
TransportShardBulkAction shardBulkAction = mock(TransportShardBulkAction.class);

// Create AutoCreateIndex with proper settings
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
AutoCreateIndex autoCreateIndex = new AutoCreateIndex(Settings.EMPTY, clusterSettings, resolver, null);

// Create the action - this verifies the constructor accepts TransportShardBulkAction
TransportUpdateAction action = new TransportUpdateAction(
threadPool,
clusterService,
transportService,
updateHelper,
actionFilters,
resolver,
indicesService,
autoCreateIndex,
client,
shardBulkAction
);

// If we got here, the constructor accepts TransportShardBulkAction parameter
// This ensures the refactoring is maintained
assertNotNull(action);
}
}
Loading