Skip to content

Remove the listener thread pool #53314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions docs/reference/migration/migrate_8_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,3 @@ processors. As this leads to more context switches and more threads but without
an increase in the number of physical CPUs on which to schedule these additional
threads, the `node.processors` setting is now bounded by the number of available
processors.

[float]
==== `thread_pool.listener.size` and `thread_pool.listener.queue_size` have been deprecated

The listener thread pool is no longer used internally by Elasticsearch.
Therefore, these settings have been deprecated. You can safely remove these
settings from the configuration of your nodes.
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean trackEWMA) {
this(settings, name, size, queueSize, trackEWMA, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
* @param deprecated whether or not the thread pool is deprecated
*/
FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final boolean trackEWMA,
final boolean deprecated
) {
this(settings, name, size, queueSize, "thread_pool." + name, trackEWMA, deprecated);
this(settings, name, size, queueSize, "thread_pool." + name, trackEWMA);
}

/**
Expand All @@ -87,45 +66,16 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix,
final boolean trackEWMA) {
this(settings, name, size, queueSize, prefix, trackEWMA, false);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
* @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
* @param deprecated whether or not the thread pool is deprecated
*/
public FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final boolean trackEWMA,
final boolean deprecated
) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
}
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
new Setting<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: is the indent change intended here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to deprecate the listener thread pool, I added some temporary infrastructure in this class (see #53266) which changed the indentation here because of my IDE having different settings than when I authored this code a long time ago. 🤷‍♀

Since after removing this setting this infrastructure isn't needed anymore, I simply reverted the file back to its initial state before #53266, including the original formatting.

Long live automatic code formatting!

sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
this.trackEWMA = trackEWMA;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class ThreadPool implements Scheduler {
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
@Deprecated public static final String LISTENER = "listener";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String WRITE = "write";
Expand Down Expand Up @@ -115,7 +114,6 @@ public static ThreadPoolType fromType(String type) {
public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries(
entry(Names.SAME, ThreadPoolType.DIRECT),
entry(Names.GENERIC, ThreadPoolType.SCALING),
entry(Names.LISTENER, ThreadPoolType.FIXED),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
Expand Down Expand Up @@ -172,7 +170,6 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false, true));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -89,10 +88,6 @@ public void testRejectedExecutionCounter() throws InterruptedException {
} finally {
terminateThreadPoolIfNeeded(threadPool);
}

if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size", "thread_pool.listener.size"});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public void testFixedExecutorType() throws InterruptedException {
} finally {
terminateThreadPoolIfNeeded(threadPool);
}

if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.size"});
}
}

public void testScalingExecutorType() throws InterruptedException {
Expand Down Expand Up @@ -177,10 +173,6 @@ public void testShutdownNowInterrupts() throws Exception {
} finally {
terminateThreadPoolIfNeeded(threadPool);
}

if (Names.LISTENER.equals(threadPoolName)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size"});
}
}

public void testCustomThreadPool() throws Exception {
Expand Down