Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add X-Opaque-Id to search request metadata for query insights #13374

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
if (userProvidedLabel != null) {
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
}
attributes.put(Attribute.LABELS, labels);
// construct SearchQueryRecord from attributes and measurements
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
NODE_ID;
NODE_ID,
/**
* Custom search request labels
*/
LABELS;

/**
* Read an Attribute from a StreamInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,39 @@
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -48,6 +59,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final ThreadPool threadPool = mock(ThreadPool.class);
private ClusterService clusterService;

@Before
Expand All @@ -61,15 +73,21 @@ public void setup() {
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
when(threadPool.getThreadContext()).thenReturn(threadContext);
}

@SuppressWarnings("unchecked")
public void testOnRequestEnd() throws InterruptedException {
Long timestamp = System.currentTimeMillis() - 100L;
SearchType searchType = SearchType.QUERY_THEN_FETCH;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -89,10 +107,19 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);
ArgumentCaptor<SearchQueryRecord> captor = ArgumentCaptor.forClass(SearchQueryRecord.class);

queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);

verify(queryInsightsService, times(1)).addRecord(any());
verify(queryInsightsService, times(1)).addRecord(captor.capture());
SearchQueryRecord generatedRecord = captor.getValue();
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand All @@ -102,6 +129,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -121,6 +149,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);

int numRequests = 50;
Thread[] threads = new Thread[numRequests];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
);
}
}

public SearchRequest getRequest() {
return searchRequest;

Check warning on line 112 in server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java#L112

Added line #L112 was not covered by tests
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
this.enabled = enabled;
}

protected abstract void onPhaseStart(SearchPhaseContext context);
protected void onPhaseStart(SearchPhaseContext context) {};

Check warning on line 44 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L44

Added line #L44 was not covered by tests

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {};

Check warning on line 46 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L46

Added line #L46 was not covered by tests

protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {};

Check warning on line 48 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L48

Added line #L48 was not covered by tests
msfroh marked this conversation as resolved.
Show resolved Hide resolved

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down
Loading