From e1c3c31beffbd1db6f71aef00a683c3b91d642fc Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Tue, 12 Sep 2023 19:24:11 +0000 Subject: [PATCH] Add threadpool wait time metric (#9681) Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../test/cat.thread_pool/10_basic.yml | 26 ++++++++++ .../search/stats/ConcurrentSearchStatsIT.java | 52 +++++++++++++++++++ .../OpenSearchThreadPoolExecutor.java | 11 ++++ ...eResizingOpenSearchThreadPoolExecutor.java | 8 +++ .../common/util/concurrent/TimedRunnable.java | 8 +++ .../rest/action/cat/RestThreadPoolAction.java | 5 ++ .../org/opensearch/threadpool/ThreadPool.java | 8 +-- .../threadpool/ThreadPoolStats.java | 26 +++++++++- .../cluster/node/stats/NodeStatsTests.java | 3 +- .../threadpool/ThreadPoolStatsTests.java | 24 ++++----- 11 files changed, 155 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9d55c8cc412c..a8766c8f97409 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) ### Dependencies - Bump `org.apache.commons:commons-compress` from 1.23.0 to 1.24.0 ([#9973](https://github.com/opensearch-project/OpenSearch/pull/9973)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml index 1ce8468cb51f9..166bbb6a50276 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml @@ -1,3 +1,29 @@ +"Test cat thread_pool total_wait_time output": + - skip: + version: " - 2.10.99" + reason: thread_pool total_wait_time stats were introduced in V_2.11.0 + + - do: + cat.thread_pool: {} + + - match: + $body: | + / #node_name name active queue rejected + ^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ + + - do: + cat.thread_pool: + thread_pool_patterns: search,search_throttled,index_searcher,generic + h: name,total_wait_time,twt + v: true + + - match: + $body: | + /^ name \s+ total_wait_time \s+ twt \n + (generic \s+ -1 \s+ -1 \n + search \s+ \d+s \s+ \d+s \n + search_throttled \s+ \d+s \s+ \d+s \n)+ $/ + --- "Test cat thread_pool output": - skip: diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java index d2af7fc6effe7..caaa9e8f425bb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java @@ -8,6 +8,8 @@ package org.opensearch.search.stats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -25,6 +27,8 @@ import org.opensearch.script.ScriptType; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; import java.util.Arrays; import java.util.Collection; @@ -307,6 +311,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException { assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); } + public void testThreadPoolWaitTime() throws Exception { + int NUM_SHARDS = 1; + String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createIndex( + INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + + ensureGreen(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10)) + .execute() + .actionGet(); + + client().prepareSearch(INDEX).execute().actionGet(); + + NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder( + client().admin().cluster(), + NodesStatsAction.INSTANCE + ).setNodesIds().all(); + NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet(); + ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool(); + + for (ThreadPoolStats.Stats stats : threadPoolStats) { + if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) { + assertThat(stats.getWaitTime().micros(), greaterThan(0L)); + } + } + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)) + .execute() + .actionGet(); + } + public static class ScriptedDelayedPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_timeout"; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..afffec4790873 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) { protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } + + /** + * Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time + * then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}. + * ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor} + * does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution. + * + */ + public long getPoolWaitTimeNanos() { + return -1; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index b4097f6a9bb51..68bae3842b800 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.unit.TimeValue; import java.util.Locale; @@ -67,6 +68,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT private final int maxQueueSize; private final long targetedResponseTimeNanos; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final CounterMetric poolWaitTime; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -98,6 +100,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.poolWaitTime = new CounterMetric(); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), @@ -189,6 +192,7 @@ protected void afterExecute(Runnable r, Throwable t) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); } + poolWaitTime.inc(timedRunnable.getWaitTimeNanos()); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); @@ -289,4 +293,8 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); } + @Override + public long getPoolWaitTimeNanos() { + return poolWaitTime.count(); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java index f3bc50a33453b..2eb6657898008 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java @@ -107,6 +107,14 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + long getWaitTimeNanos() { + if (startTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return Math.max(startTimeNanos - creationTimeNanos, 1); + } + /** * If the task was failed or rejected, return true. * Otherwise, false. diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java index f71001f55139a..7a6ff856fd2e9 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java @@ -163,6 +163,10 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks"); table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads"); table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks"); + table.addCell( + "total_wait_time", + "alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue" + ); table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool"); table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool"); table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool"); @@ -267,6 +271,7 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR table.addCell(poolStats == null ? null : poolStats.getRejected()); table.addCell(poolStats == null ? null : poolStats.getLargest()); table.addCell(poolStats == null ? null : poolStats.getCompleted()); + table.addCell(poolStats == null ? null : poolStats.getWaitTime()); table.addCell(core); table.addCell(max); table.addCell(size); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 7c32b5270e8ab..8a860b598b6d8 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -403,19 +403,21 @@ public ThreadPoolStats stats() { long rejected = -1; int largest = -1; long completed = -1; - if (holder.executor() instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor(); + long waitTimeNanos = -1; + if (holder.executor() instanceof OpenSearchThreadPoolExecutor) { + OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor(); threads = threadPoolExecutor.getPoolSize(); queue = threadPoolExecutor.getQueue().size(); active = threadPoolExecutor.getActiveCount(); largest = threadPoolExecutor.getLargestPoolSize(); completed = threadPoolExecutor.getCompletedTaskCount(); + waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos(); RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) { rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected(); } } - stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed)); + stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos)); } return new ThreadPoolStats(stats); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java index b4d7e4a3fbf7a..7b4c1504d927a 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java @@ -32,6 +32,8 @@ package org.opensearch.threadpool; +import org.opensearch.Version; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -65,8 +67,9 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L)); List copy = new ArrayList<>(stats); Collections.sort(copy); @@ -79,11 +79,11 @@ public void testThreadPoolStatsToXContent() throws IOException { try (BytesStreamOutput os = new BytesStreamOutput()) { List stats = new ArrayList<>(); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L)); - stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L)); + stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L)); ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats); try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {