Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tracing Instrumentation] Add instrumentation in InboundHandler #10143

Merged
merged 10 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122))
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143))
- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_TRANSPORT_NAME,
Expand All @@ -108,7 +109,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4NioSocketChannel;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -131,9 +132,10 @@ public Netty4Transport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void startThreadPool() {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
PageCacheRecycler.NON_RECYCLING_INSTANCE,
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() {
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
writableRegistry(),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
) {
@Override
public TransportAddress[] addressesFromString(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() {
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void addAttribute(String key, Boolean value) {

@Override
public void setError(Exception exception) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());
if (exception != null) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.nio.NioSelector;
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpTransport;
import org.opensearch.transport.TransportSettings;
Expand Down Expand Up @@ -84,9 +85,10 @@ protected NioTransport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
NioGroupFactory groupFactory
NioGroupFactory groupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.groupFactory = groupFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NIO_TRANSPORT_NAME,
Expand All @@ -103,7 +104,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getNioGroupFactory(settings)
getNioGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
new MockPageCacheRecycler(settings),
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new NioGroupFactory(settings, logger)
new NioGroupFactory(settings, logger),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public NetworkModule(
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
networkService
networkService,
tracer
);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ default Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private AttributeNames() {
*/
public static final String TRANSPORT_TARGET_HOST = "target_host";

/**
* Transport Service send request local host.
*/
public static final String TRANSPORT_HOST = "host";

/**
* Action Name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;

import java.util.Arrays;
Expand Down Expand Up @@ -127,4 +128,26 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
return attributes;
}

/**
* Creates {@link SpanCreationContext} from Inbound Handler.
* @param action action.
* @param tcpChannel tcp channel.
* @return context
*/
public static SpanCreationContext from(String action, TcpChannel tcpChannel) {
return SpanCreationContext.server().name(createSpanName(action, tcpChannel)).attributes(buildSpanAttributes(action, tcpChannel));
}

private static String createSpanName(String action, TcpChannel tcpChannel) {
return action + SEPARATOR + (tcpChannel.getRemoteAddress() != null
? tcpChannel.getRemoteAddress().getHostString()
: tcpChannel.getLocalAddress().getHostString());
}

private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action);
attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString());
return attributes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.channels;

import org.opensearch.Version;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.BaseTcpTransportChannel;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.TcpTransportChannel;
import org.opensearch.transport.TransportChannel;

import java.io.IOException;

/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTcpTransportChannel extends BaseTcpTransportChannel {

private final TransportChannel delegate;
private final Span span;
private final Tracer tracer;

/**
* Constructor.
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tracer tracer) {
super(delegate.getChannel());
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
}

/**
* Factory method.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
* @param tcpChannel tcpChannel
* @return transport channel
*/
public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) {
reta marked this conversation as resolved.
Show resolved Hide resolved
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
tcpChannel.addCloseListener(new ActionListener<Void>() {
reta marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onResponse(Void unused) {
onFailure(null);
}

@Override
public void onFailure(Exception e) {
span.addEvent("The TransportChannel was closed without sending the response");
span.setError(e);
span.endSpan();
}
});

return new TraceableTcpTransportChannel(delegate, span, tracer);
} else {
return delegate;
}
}

@Override
public String getProfileName() {
return delegate.getProfileName();
}

@Override
public String getChannelType() {
return delegate.getChannelType();
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(response);
} catch (final IOException ex) {
span.setError(ex);
throw ex;
} finally {
span.endSpan();
}
}

@Override
public void sendResponse(Exception exception) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(exception);
} finally {
span.setError(exception);
span.endSpan();
}
}

@Override
public Version getVersion() {
return delegate.getVersion();
}
}
Loading