Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing instrumentation for indexing paths #10273

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -133,6 +138,7 @@
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;
private final Tracer tracer;

@Inject
public TransportBulkAction(
Expand All @@ -147,7 +153,8 @@
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
this(
threadPool,
Expand All @@ -162,7 +169,8 @@
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
System::nanoTime,
tracer
);
}

Expand All @@ -179,7 +187,8 @@
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
Tracer tracer
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
Expand All @@ -196,6 +205,7 @@
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
this.tracer = tracer;
}

/**
Expand Down Expand Up @@ -642,52 +652,66 @@
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
rayshrey marked this conversation as resolved.
Show resolved Hide resolved
shardBulkAction.execute(
bulkShardRequest,
TraceableActionListener.create(ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}, releasable::close));
private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)
)
);
}
}, releasable::close), span, tracer)
);
} catch (Exception e) {
span.setError(e);
span.endSpan();
throw e;

Check warning on line 713 in server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java#L710-L713

Added lines #L710 - L713 were not covered by tests
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -146,6 +147,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
* term validation in presence of a new primary.
*/
private final String transportPrimaryTermValidationAction;
private final Tracer tracer;
reta marked this conversation as resolved.
Show resolved Hide resolved

@Inject
public TransportShardBulkAction(
Expand All @@ -161,7 +163,8 @@ public TransportShardBulkAction(
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -177,12 +180,14 @@ public TransportShardBulkAction(
EXECUTOR_NAME_FUNCTION,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.remoteStorePressureService = remoteStorePressureService;
this.tracer = tracer;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -93,7 +94,8 @@ public TransportResyncReplicationAction(
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -109,7 +111,8 @@ public TransportResyncReplicationAction(
EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressureService,
systemIndices
systemIndices,
tracer
reta marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.opensearch.index.translog.Translog.Location;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -82,6 +87,7 @@ public abstract class TransportWriteAction<
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
private final Tracer tracer;

protected TransportWriteAction(
Settings settings,
Expand All @@ -97,7 +103,8 @@ protected TransportWriteAction(
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -119,6 +126,7 @@ protected TransportWriteAction(
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
this.tracer = tracer;
}

protected String executor(IndexShard shard) {
Expand Down Expand Up @@ -220,7 +228,12 @@ protected void shardOperationOnPrimary(
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down Expand Up @@ -248,7 +261,12 @@ protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replic
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnReplica(request, replica, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -99,7 +100,8 @@ public RetentionLeaseSyncAction(
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexingPressureService indexingPressureService,
final SystemIndices systemIndices
final SystemIndices systemIndices,
final Tracer tracer
) {
super(
settings,
Expand All @@ -115,7 +117,8 @@ public RetentionLeaseSyncAction(
ignore -> ThreadPool.Names.MANAGEMENT,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,29 @@ private AttributeNames() {
* Action Name.
*/
public static final String TRANSPORT_ACTION = "action";

/**
* Index Name
*/
public static final String INDEX = "index";

/**
* Shard ID
*/
public static final String SHARD_ID = "shard_id";

/**
* Number of request items in bulk request
*/
public static final String BULK_REQUEST_ITEMS = "bulk_request_items";

/**
* Node ID
*/
public static final String NODE_ID = "node_id";

/**
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";
}
Loading
Loading