Skip to content

Commit

Permalink
Add threadpool wait time metric (opensearch-project#9681)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 13, 2023
1 parent b4ed668 commit ec8fe05
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))

### Dependencies
- Bump `org.apache.commons:commons-compress` from 1.23.0 to 1.24.0 ([#9973](https://github.com/opensearch-project/OpenSearch/pull/9973))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
"Test cat thread_pool total_wait_time output":
- skip:
version: " - 2.10.99"
reason: thread_pool total_wait_time stats were introduced in V_2.11.0

- do:
cat.thread_pool: {}

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
- do:
cat.thread_pool:
thread_pool_patterns: search,search_throttled,index_searcher,generic
h: name,total_wait_time,twt
v: true

- match:
$body: |
/^ name \s+ total_wait_time \s+ twt \n
(generic \s+ -1 \s+ -1 \n
search \s+ \d+s \s+ \d+s+ \n
search_throttled \s+ \d+s \s+ \d+s+ \n)+ $/
---
"Test cat thread_pool output":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
Expand All @@ -25,6 +27,8 @@
import org.opensearch.script.ScriptType;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -307,6 +311,41 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException {
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public void testThreadPoolWaitTime() throws Exception {
int NUM_SHARDS = 1;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}

client().prepareSearch(INDEX).execute().actionGet();

NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder(
client().admin().cluster(),
NodesStatsAction.INSTANCE
).setNodesIds().all();
NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet();
ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool();

for (ThreadPoolStats.Stats stats : threadPoolStats) {
if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) {
assertThat(stats.getWaitTime().nanos(), greaterThan(0L));
}
}
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) {
protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
}

/**
* Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time
* then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}.
* ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor}
* does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution.
*
*/
public long getPoolWaitTimeNanos() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
Expand Down Expand Up @@ -67,6 +68,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand Down Expand Up @@ -98,6 +100,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
this.poolWaitTime = new CounterMetric();
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
getName(),
Expand Down Expand Up @@ -189,6 +192,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
Expand Down Expand Up @@ -289,4 +293,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

long getWaitTimeNanos() {
if (startTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(startTimeNanos - creationTimeNanos, 1);
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks");
table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads");
table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks");
table.addCell(
"total_wait_time",
"alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue"
);
table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool");
table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool");
table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool");
Expand Down Expand Up @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
table.addCell(poolStats == null ? null : poolStats.getRejected());
table.addCell(poolStats == null ? null : poolStats.getLargest());
table.addCell(poolStats == null ? null : poolStats.getCompleted());
table.addCell(poolStats == null ? null : poolStats.getWaitTime());
table.addCell(core);
table.addCell(max);
table.addCell(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,19 +403,21 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
if (holder.executor() instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
long waitTimeNanos = -1;
if (holder.executor() instanceof OpenSearchThreadPoolExecutor) {
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor();
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.threadpool;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -65,15 +67,17 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
private final long rejected;
private final int largest;
private final long completed;
private final long waitTimeNanos;

public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed) {
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
this.largest = largest;
this.completed = completed;
this.waitTimeNanos = waitTimeNanos;
}

public Stats(StreamInput in) throws IOException {
Expand All @@ -84,6 +88,7 @@ public Stats(StreamInput in) throws IOException {
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
waitTimeNanos = in.getVersion().onOrAfter(Version.V_2_11_0) ? in.readLong() : -1;
}

@Override
Expand All @@ -95,6 +100,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getVersion().onOrAfter(Version.V_2_11_0)) {
out.writeLong(waitTimeNanos);
}
}

public String getName() {
Expand Down Expand Up @@ -125,6 +133,14 @@ public long getCompleted() {
return this.completed;
}

public TimeValue getWaitTime() {
return TimeValue.timeValueNanos(waitTimeNanos);
}

public long getWaitTimeNanos() {
return waitTimeNanos;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
Expand All @@ -146,6 +162,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (completed != -1) {
builder.field(Fields.COMPLETED, completed);
}
if (waitTimeNanos != -1) {
if (builder.humanReadable()) {
builder.field(Fields.WAIT_TIME, getWaitTime());
}
builder.field(Fields.WAIT_TIME_NANOS, getWaitTimeNanos());
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -197,6 +219,8 @@ static final class Fields {
static final String REJECTED = "rejected";
static final String LARGEST = "largest";
static final String COMPLETED = "completed";
static final String WAIT_TIME = "total_wait_time";
static final String WAIT_TIME_NANOS = "total_wait_time_in_nanos";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000)
randomIntBetween(1, 1000),
randomIntBetween(-1, 10)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
public class ThreadPoolStatsTests extends OpenSearchTestCase {
public void testThreadPoolStatsSort() throws IOException {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L));

List<ThreadPoolStats.Stats> copy = new ArrayList<>(stats);
Collections.sort(copy);
Expand All @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException {
try (BytesStreamOutput os = new BytesStreamOutput()) {

List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L));
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L));

ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats);
try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {
Expand Down

0 comments on commit ec8fe05

Please sign in to comment.