Skip to content

Commit

Permalink
Change INDEX_SEARCHER threadpool to resizable to support task resourc…
Browse files Browse the repository at this point in the history
…e tracking

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
Jay Deng committed May 10, 2023
1 parent 6ac3317 commit cd88594
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
- Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203))
- Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502))

### Deprecated

Expand All @@ -116,4 +117,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.admin.cluster.node.tasks;

import org.hamcrest.MatcherAssert;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

/**
* Integration tests for task management API
* <p>
* We need at least 2 nodes so we have a cluster-manager node a non-cluster-manager node
*/
public class ConcurrentSearchTasksIT extends TasksIT {

private static final int INDEX_SEARCHER_THREADS = 3;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.index_searcher.size", INDEX_SEARCHER_THREADS)
.put("thread_pool.index_searcher.queue_size", INDEX_SEARCHER_THREADS)
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true);
return featureSettings.build();
}

public void testConcurrentSearchTaskTracking() {
final String INDEX_NAME = "test";
final int NUM_SHARDS = 1;
final int NUM_DOCS = 7;

registerTaskManagerListeners(SearchAction.NAME); // coordinator task
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(INDEX_NAME); // Make sure all shards are allocated to catch replication tasks
indexDocumentsWithRefresh(INDEX_NAME, NUM_DOCS); // Concurrent search requires >5 segments or >250,000 docs to have concurrency, so we index 7 docs each in a different segment.
assertSearchResponse(client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get());

// the search operation should produce one coordinator task
List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1);
assertEquals(1, mainTask.size());
TaskInfo mainTaskInfo = mainTask.get(0);
MatcherAssert.assertThat(mainTaskInfo.getDescription(), startsWith("indices[test], search_type["));
MatcherAssert.assertThat(mainTaskInfo.getDescription(), containsString("\"query\":{\"match_all\""));

// check that if we have any shard-level requests they all have non-zero length description
List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1);
assertEquals(NUM_SHARDS, shardTasks.size()); // We should only have 1 shard search task per shard
for (TaskInfo taskInfo : shardTasks) {
MatcherAssert.assertThat(taskInfo.getParentTaskId(), notNullValue());
assertEquals(mainTaskInfo.getTaskId(), taskInfo.getParentTaskId());
switch (taskInfo.getAction()) {
case SearchTransportService.QUERY_ACTION_NAME:
case SearchTransportService.DFS_ACTION_NAME:
case SearchTransportService.QUERY_CAN_MATCH_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("shardId[[test][*]]", taskInfo.getDescription()));
break;
case SearchTransportService.QUERY_ID_ACTION_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("id[*], indices[test]", taskInfo.getDescription()));
break;
case SearchTransportService.FETCH_ID_ACTION_NAME:
assertTrue(
taskInfo.getDescription(),
Regex.simpleMatch("id[*], size[1], lastEmittedDoc[null]", taskInfo.getDescription())
);
break;
default:
fail("Unexpected action [" + taskInfo.getAction() + "] with description [" + taskInfo.getDescription() + "]");
}

Map<Long, List<ThreadResourceInfo>> threadStats = getThreadStats(SearchAction.NAME + "[*]", taskInfo.getTaskId());
assertEquals(2, threadStats.size()); // Confirm that more than 1 thread worked on this task and recorded stats

// assert that all task descriptions have non-zero length
MatcherAssert.assertThat(taskInfo.getDescription().length(), greaterThan(0));
}
}

private void indexDocumentsWithRefresh(String indexName, int numDocs) {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName)
.setId(String.format("test_id_%d", i))
.setSource(String.format("{\"foo_%d\": \"bar_%d\"}", i, i), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskResult;
import org.opensearch.tasks.TaskResultsService;
import org.opensearch.tasks.ThreadResourceInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.tasks.MockTaskManager;
import org.opensearch.test.tasks.MockTaskManagerListener;
Expand All @@ -87,6 +88,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -955,7 +957,7 @@ public void tearDown() throws Exception {
/**
* Registers recording task event listeners with the given action mask on all nodes
*/
private void registerTaskManagerListeners(String actionMasks) {
protected void registerTaskManagerListeners(String actionMasks) {
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(","));
Expand Down Expand Up @@ -992,7 +994,7 @@ private int numberOfEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>
* @param actionMasks action masks to match
* @return number of events that satisfy the criteria
*/
private List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
protected List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
List<TaskInfo> events = new ArrayList<>();
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
Expand All @@ -1006,6 +1008,19 @@ private List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, Ta
return events;
}

protected Map<Long, List<ThreadResourceInfo>> getThreadStats(String actionMasks, TaskId taskId) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
for (Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>> threadStats : entry.getValue().getThreadStats()) {
if (taskId.equals(threadStats.v1())) {
return threadStats.v2();
}
}
}
}
return new HashMap<>();
}

/**
* Asserts that all tasks in the tasks list have the same parentTask
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ public ThreadPool(
builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000));
builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
builders.put(Names.INDEX_SEARCHER, new FixedExecutorBuilder(settings, Names.INDEX_SEARCHER, allocatedProcessors, 1000, false));
builders.put(
Names.INDEX_SEARCHER,
new ResizableExecutorBuilder(settings, Names.INDEX_SEARCHER, allocatedProcessors, 1000, runnableTaskListener));
}

for (final ExecutorBuilder<?> builder : customBuilders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;
import org.opensearch.test.tasks.MockTaskManagerListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -52,6 +55,7 @@ public class RecordingTaskManagerListener implements MockTaskManagerListener {
private String localNodeId;

private List<Tuple<Boolean, TaskInfo>> events = new ArrayList<>();
private List<Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>>> threadStats = new ArrayList<>();

public RecordingTaskManagerListener(String localNodeId, String... actionMasks) {
this.actionMasks = actionMasks;
Expand All @@ -68,7 +72,9 @@ public synchronized void onTaskRegistered(Task task) {
@Override
public synchronized void onTaskUnregistered(Task task) {
if (Regex.simpleMatch(actionMasks, task.getAction())) {
events.add(new Tuple<>(false, task.taskInfo(localNodeId, true)));
TaskInfo taskInfo = task.taskInfo(localNodeId, true);
events.add(new Tuple<>(false, taskInfo));
threadStats.add(new Tuple<>(taskInfo.getTaskId(), task.getResourceStats()));
}
}

Expand All @@ -82,6 +88,10 @@ public synchronized List<Tuple<Boolean, TaskInfo>> getEvents() {
return Collections.unmodifiableList(new ArrayList<>(events));
}

public synchronized List<Tuple<TaskId, Map<Long, List<ThreadResourceInfo>>>> getThreadStats() {
return List.copyOf(threadStats);
}

public synchronized List<TaskInfo> getRegistrationEvents() {
List<TaskInfo> events = this.events.stream().filter(Tuple::v1).map(Tuple::v2).collect(Collectors.toList());
return Collections.unmodifiableList(events);
Expand Down

0 comments on commit cd88594

Please sign in to comment.