Skip to content

Commit

Permalink
[BUG] The thread context is not properly cleared and messes up the tr…
Browse files Browse the repository at this point in the history
…aces (opensearch-project#10873)

* [BUG] The thread context is not properly cleared and messes up the traces

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

---------

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta authored Nov 20, 2023
1 parent b0d6b3c commit 00517eb
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))
- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
return newSpanScope;
}

@Override
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public SpanScope attach() {
spanScopeThreadLocal.set(this);
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

private void detach() {
spanScopeThreadLocal.set(previousSpanScope);
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) {
parentSpan = getCurrentSpanInternal();
}
Span span = createSpan(context, parentSpan);
setCurrentSpanInContext(span);
addDefaultAttributes(span);
return span;
}
Expand Down Expand Up @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan
return tracingTelemetry.createSpan(spanCreationContext, parentSpan);
}

private void setCurrentSpanInContext(Span span) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span);
}

/**
* Adds default attributes in the span
* @param span the current active span
Expand All @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.http.HttpTracer;
import org.opensearch.telemetry.tracing.transport.TransportTracer;

import java.io.Closeable;

Expand All @@ -22,7 +22,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public interface Tracer extends HttpTracer, Closeable {
public interface Tracer extends TransportTracer, Closeable {
/**
* Starts the {@link Span} with given {@link SpanCreationContext}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand All @@ -36,7 +36,7 @@ public interface TracingContextPropagator {
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);
Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public void close() {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
* TransportTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HTTP or TCP transport headers and propagate the span accordingly.
* <p>
* All methods on the Tracer object are multi-thread safe.
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface HttpTracer {
public interface TransportTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanCreationContext span name.
* @param header http request header.
* @return span.
* @param headers transport headers
* @return the span instance
*/
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Contains No-op implementations
* Contains HTTP or TCP transport related tracer capabilities
*/
package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase {
private Span mockSpan;
private Span mockParentSpan;

private SpanScope mockSpanScope;
private ThreadPool threadPool;
private ExecutorService executorService;
private SpanCreationContext spanCreationContext;
Expand Down Expand Up @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1"));
assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2"));
assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3"));
assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4"));
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
assertEquals(1.0, ((MockSpan) span).getAttribute("key1"));
assertEquals(2l, ((MockSpan) span).getAttribute("key2"));
assertEquals(true, ((MockSpan) span).getAttribute("key3"));
assertEquals("key4", ((MockSpan) span).getAttribute("key4"));
span.endSpan();
}

Expand All @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() {

Span span = defaultTracer.startSpan(spanCreationContext, null);

SpanContext parentSpan = defaultTracer.getCurrentSpan();

SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());
try (final SpanScope scope = defaultTracer.withSpanInScope(span)) {
SpanContext parentSpan = defaultTracer.getCurrentSpan();

Span span1 = defaultTracer.startSpan(spanCreationContext1);
SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());

assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span1.endSpan();
span.endSpan();
try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) {
assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}
} finally {
span.endSpan();
}
}

@SuppressWarnings("unchecked")
Expand All @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
span.endSpan();
}

Expand Down Expand Up @@ -403,7 +405,6 @@ private void setupMocks() {
mockTracingTelemetry = mock(TracingTelemetry.class);
mockSpan = mock(Span.class);
mockParentSpan = mock(Span.class);
mockSpanScope = mock(SpanScope.class);
mockTracerContextStorage = mock(TracerContextStorage.class);
when(mockSpan.getSpanName()).thenReturn("span_name");
when(mockSpan.getSpanId()).thenReturn("span_id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {

// Create Index and ingest data
String indexName = "test-index-11";
Settings basicSettings = Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build();
Settings basicSettings = Settings.builder()
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
.put("index.routing.allocation.total_shards_per_node", 1)
.build();
createIndex(indexName, basicSettings);
indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well"));
indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field2", "another fox did the same."));

indexRandom(false, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well"));
indexRandom(false, client.prepareIndex(indexName).setId("2").setSource("field2", "another fox did the same."));

ensureGreen();
refresh();

// Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs.
client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("fox")).get();
client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("jumps")).get();

ensureGreen();
refresh();
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get();
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get();

// Sleep for about 3s to wait for traces are published, delay is (the delay is 1s).
Thread.sleep(3000);
Expand All @@ -88,8 +90,10 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
)
);

// See please https://github.com/opensearch-project/OpenSearch/issues/10291 till local transport is not instrumented,
// capturing only the inter-nodes transport actions.
InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
validators.validate(exporter.getFinishedSpanItems(), 6);
validators.validate(exporter.getFinishedSpanItems(), 4);
}

private static void updateTelemetrySetting(Client client, boolean value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import org.opensearch.core.common.Strings;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) {
}

@Override
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
public Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}
Expand Down Expand Up @@ -87,9 +87,9 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
private static final TextMapGetter<Map<String, Collection<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(Map<String, List<String>> headers) {
public Iterable<String> keys(Map<String, Collection<String>> headers) {
if (headers != null) {
return headers.keySet();
} else {
Expand All @@ -98,7 +98,7 @@ public Iterable<String> keys(Map<String, List<String>> headers) {
}

@Override
public String get(Map<String, List<String>> headers, String key) {
public String get(Map<String, Collection<String>> headers, String key) {
if (headers != null && headers.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headers.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() {
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
Map<String, Collection<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
* @param httpChannel that received the http request
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders());
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders()));
try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) {
HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer);
handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException());
Expand Down Expand Up @@ -483,4 +485,9 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
return NO_OP;
}
}

@SuppressWarnings("unchecked")
private static <Values extends Collection<String>> Map<String, Collection<String>> extractHeaders(Map<String, Values> headers) {
return (Map<String, Collection<String>>) headers;
}
}
Loading

0 comments on commit 00517eb

Please sign in to comment.