Skip to content

Commit

Permalink
Introduce ConcurrentQueryProfiler to profile query using concurrent s…
Browse files Browse the repository at this point in the history
…egment search path and support concurrency during rewrite and create weight (opensearch-project#10352)

* Fix timer race condition in profile rewrite and create weight for concurrent segment search (opensearch-project#10352)

Signed-off-by: Ticheng Lin <ticheng@amazon.com>

* Refactor and work on the PR comments (opensearch-project#10352)

Signed-off-by: Ticheng Lin <ticheng@amazon.com>

---------

Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws authored and sohami committed Oct 30, 2023
1 parent 2796681 commit 1bd7fe1
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,89 @@ public void testBoosting() throws Exception {
}
}

public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
createIndex("test");
ensureGreen();

int numDocs = 122;
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
}

List<String> terms = Arrays.asList("zero", "zero", "one");

indexRandom(true, docs);

refresh();

QueryBuilder q = QueryBuilders.boostingQuery(
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
QueryBuilders.termsQuery("field1", terms)
).boost(randomFloat()).negativeBoost(randomFloat());
logger.info("Query: {}", q);

SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.get();

assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));

for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
List<ProfileResult> results = searchProfiles.getQueryResults();
for (ProfileResult result : results) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled && results.get(0).equals(result)) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else if (concurrentSearchEnabled) {
assertThat(maxSliceTime, equalTo(0L));
assertThat(minSliceTime, equalTo(0L));
assertThat(avgSliceTime, equalTo(0L));
assertThat(breakdown.size(), equalTo(27));
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

CollectorResult result = searchProfiles.getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
}
}
}

public void testDisMaxRange() throws Exception {
createIndex("test");
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
import org.opensearch.search.profile.query.InternalQueryProfileTree;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
: new QueryProfiler(new InternalQueryProfileTree());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public class Timer {
private boolean doTiming;
private long timing, count, lastCount, start, earliestTimerStartTime;

public Timer() {
this(0, 0, 0, 0, 0);
}

public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
this.timing = timing;
this.count = count;
this.lastCount = lastCount;
this.start = start;
this.earliestTimerStartTime = earliestTimerStartTime;
}

/** pkg-private for testing */
long nanoTime() {
return System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ public void startRewriteTime() {
* startRewriteTime() must be called for a particular context prior to calling
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
* nonsensical
*
* @return The elapsed time
*/
public long stopAndAddRewriteTime() {
public void stopAndAddRewriteTime() {
long time = Math.max(1, System.nanoTime() - rewriteScratch);
rewriteTime += time;
rewriteScratch = 0;
return time;
}

public long getRewriteTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
);
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());

if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
if (contexts.isEmpty()) {
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
// create_weight time/count
queryNodeTime = createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
return buildDefaultQueryBreakdownMap(createWeightTime);
} else if (sliceCollectorsToLeaves.isEmpty()) {
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
return queryBreakdownMap;
}

// first create the slice level breakdowns
Expand Down Expand Up @@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
}
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
sliceMinStartTime = Math.min(
sliceMinStartTime,
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
);
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
if (currentSliceStartTime == 0L) {
// The timer for the current timing type never starts, so we continue here
continue;
}
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
currentSliceBreakdown.put(
timingType.toString(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.profile.query;

import org.apache.lucene.search.Query;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class acts as a thread-local storage for profiling a query with concurrent execution
*
* @opensearch.internal
*/
public final class ConcurrentQueryProfiler extends QueryProfiler {

private final Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;
// The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only
// one thread will access the LinkedList at a time.
private final Map<Long, LinkedList<Timer>> threadToRewriteTimers;

public ConcurrentQueryProfiler(AbstractQueryProfileTree profileTree) {
super(profileTree);
long threadId = getCurrentThreadId();
// We utilize LinkedHashMap to preserve the insertion order of the profiled queries
threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>());
threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree);
threadToRewriteTimers = new ConcurrentHashMap<>();
threadToRewriteTimers.put(threadId, new LinkedList<>());
}

@Override
public ContextualProfileBreakdown<QueryTimingType> getQueryBreakdown(Query query) {
ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent(
getCurrentThreadId(),
k -> new ConcurrentQueryProfileTree()
);
return profileTree.getProfileBreakdown(query);
}

/**
* Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack.
*/
@Override
public void pollLastElement() {
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId());
if (concurrentProfileTree != null) {
concurrentProfileTree.pollLast();
}
}

/**
* @return a hierarchical representation of the profiled tree
*/
@Override
public List<ProfileResult> getTree() {
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
}

/**
* Begin timing the rewrite phase of a request
*/
@Override
public void startRewriteTime() {
Timer rewriteTimer = new Timer();
threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer);
rewriteTimer.start();
}

/**
* Stop recording the current rewrite timer
*/
public void stopAndAddRewriteTime() {
Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast();
rewriteTimer.stop();
}

/**
* @return total time taken to rewrite all queries in this concurrent query profiler
*/
@Override
public long getRewriteTime() {
long totalRewriteTime = 0L;
List<Timer> rewriteTimers = new LinkedList<>();
threadToRewriteTimers.values().forEach(rewriteTimers::addAll);
LinkedList<long[]> mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers);
for (long[] interval : mergedIntervals) {
totalRewriteTime += interval[1] - interval[0];
}
return totalRewriteTime;
}

// package private for unit testing
LinkedList<long[]> mergeRewriteTimeIntervals(List<Timer> timers) {
LinkedList<long[]> mergedIntervals = new LinkedList<>();
timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime));
for (Timer timer : timers) {
long startTime = timer.getEarliestTimerStartTime();
long endTime = startTime + timer.getApproximateTiming();
if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) {
long[] interval = new long[2];
interval[0] = startTime;
interval[1] = endTime;
mergedIntervals.add(interval);
} else {
mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime);
}
}
return mergedIntervals;
}

private long getCurrentThreadId() {
return Thread.currentThread().getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@
*
* @opensearch.internal
*/
public final class QueryProfiler extends AbstractProfiler<ContextualProfileBreakdown<QueryTimingType>, Query> {
public class QueryProfiler extends AbstractProfiler<ContextualProfileBreakdown<QueryTimingType>, Query> {

/**
* The root Collector used in the search
*/
private InternalProfileComponent collector;

public QueryProfiler(boolean concurrent) {
super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree());
public QueryProfiler(AbstractQueryProfileTree profileTree) {
super(profileTree);
}

/** Set the collector that is associated with this profiler. */
Expand All @@ -81,14 +81,14 @@ public void startRewriteTime() {
/**
* Stop recording the current rewrite and add it's time to the total tally, returning the
* cumulative time so far.
*
* @return cumulative rewrite time
*/
public long stopAndAddRewriteTime() {
return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime();
public void stopAndAddRewriteTime() {
((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime();
}

/**
* The rewriting process is complex and hard to display because queries can undergo significant changes.
* Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case.
* @return total time taken to rewrite all queries in this profile
*/
public long getRewriteTime() {
Expand Down
Loading

0 comments on commit 1bd7fe1

Please sign in to comment.