Skip to content

Commit

Permalink
Create threads named after its executor/event-loop (#3761)
Browse files Browse the repository at this point in the history
* Create threads named after it's executor/event-loop

* resolve backward compatibility issues

Co-authored-by: Denis Stepanov <denis.stepanov@gmail.com>
  • Loading branch information
graemerocher and dstepanov authored Sep 10, 2020
1 parent 953304f commit 2d7984f
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

import javax.inject.Named;
import javax.inject.Singleton;
Expand All @@ -41,25 +43,31 @@
@BootstrapContextCompatible
public class DefaultEventLoopGroupRegistry implements EventLoopGroupRegistry {
private final EventLoopGroupFactory eventLoopGroupFactory;
private final ThreadFactory threadFactory;
private final BeanLocator beanLocator;

/**
* Default constructor.
*
* @param eventLoopGroupFactory The event loop group factory
* @param threadFactory The thread factory
* @param beanLocator The bean locator
*/
public DefaultEventLoopGroupRegistry(
EventLoopGroupFactory eventLoopGroupFactory,
@Named(NettyThreadFactory.NAME) ThreadFactory threadFactory,
BeanLocator beanLocator) {
public DefaultEventLoopGroupRegistry(EventLoopGroupFactory eventLoopGroupFactory, BeanLocator beanLocator) {
this.eventLoopGroupFactory = eventLoopGroupFactory;
this.threadFactory = threadFactory;
this.beanLocator = beanLocator;
}

/**
* Constructs an event loop group thread factory.
*
* @param configuration The configuration
* @return The thread factory
*/
@EachBean(EventLoopGroupConfiguration.class)
@BootstrapContextCompatible
protected ThreadFactory eventLoopGroupThreadFactory(EventLoopGroupConfiguration configuration) {
return new DefaultThreadFactory(configuration.getName() + "-" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class));
}

/**
* Constructs an event loop group for each configuration.
*
Expand All @@ -79,21 +87,23 @@ protected EventLoopGroup eventLoopGroup(EventLoopGroupConfiguration configuratio
configuration.getIoRatio().orElse(null)
)).orElseThrow(() -> new ConfigurationException("No executor service configured for name: " + executor));
} else {
ThreadFactory threadFactory = beanLocator.getBean(ThreadFactory.class, Qualifiers.byName(configuration.getName()));
return eventLoopGroupFactory.createEventLoopGroup(configuration, threadFactory);
}
}

/**
* Constructs an event loop group with default Configuration.
*
* @param threadFactory The default Netty thread factory
* @return The event loop group
*/
@Singleton
@Requires(missingProperty = EventLoopGroupConfiguration.DEFAULT_LOOP)
@Primary
@Bean(preDestroy = "shutdownGracefully")
@BootstrapContextCompatible
protected EventLoopGroup defaultEventLoopGroup() {
protected EventLoopGroup defaultEventLoopGroup(@Named(NettyThreadFactory.NAME) ThreadFactory threadFactory) {
return eventLoopGroupFactory.createEventLoopGroup(new DefaultEventLoopGroupConfiguration(), threadFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ public class NettyThreadFactory {
@Named(NAME)
@BootstrapContextCompatible
protected ThreadFactory nettyThreadFactory() {
return new DefaultThreadFactory(NioEventLoopGroup.class);
return new DefaultThreadFactory("default-" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ class ThreadSelectionSpec extends Specification {
@Unroll
void "test thread selection strategy #strategy"() {
given:
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer, ['micronaut.server.thread-selection': strategy])
ThreadSelectionClient client = embeddedServer.applicationContext.getBean(ThreadSelectionClient)
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer, ['micronaut.server.thread-selection': strategy])
ThreadSelectionClient client = embeddedServer.applicationContext.getBean(ThreadSelectionClient)

expect:
client.blocking().contains(blocking)
client.nonblocking().contains(nonBlocking)
client.reactive().contains(reactive)
client.reactiveBlocking().contains(blockingReactive)
client.blocking().contains(blocking)
client.nonblocking().contains(nonBlocking)
client.reactive().contains(reactive)
client.reactiveBlocking().contains(blockingReactive)

cleanup:
embeddedServer.close()
embeddedServer.close()

where:
strategy | blocking | nonBlocking | reactive | blockingReactive | scheduleBlocking | scheduleReactive
ThreadSelection.AUTO | 'pool-' | 'nioEventLoopGroup' | 'nioEventLoopGroup' | 'pool-' | "pool-" | "pool-"
ThreadSelection.IO | 'pool-' | 'pool-' | 'pool-' | 'pool-' | "pool-" | "pool-"
ThreadSelection.MANUAL | 'nioEventLoopGroup' | 'nioEventLoopGroup' | 'nioEventLoopGroup' | 'nioEventLoopGroup' | "pool-" | "pool-"
strategy | blocking | nonBlocking | reactive | blockingReactive | scheduleBlocking | scheduleReactive
ThreadSelection.AUTO | 'io-executor-thread-' | 'default-nioEventLoopGroup' | 'default-nioEventLoopGroup' | 'io-executor-thread-' | "io-executor-thread-" | "io-executor-thread-"
ThreadSelection.IO | 'io-executor-thread-' | 'io-executor-thread-' | 'io-executor-thread-' | 'io-executor-thread-' | "io-executor-thread-" | "io-executor-thread-"
ThreadSelection.MANUAL | 'default-nioEventLoopGroup' | 'default-nioEventLoopGroup' | 'default-nioEventLoopGroup' | 'default-nioEventLoopGroup' | "io-executor-thread-" | "io-executor-thread-"


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.micronaut.scheduling.executor;

import edu.umd.cs.findbugs.annotations.Nullable;

import javax.validation.constraints.Min;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -45,6 +47,14 @@ public interface ExecutorConfiguration {
*/
String PREFIX_CONSUMER = PREFIX + ".consumer";

/**
* @return The name of the component
*/
@Nullable
default String getName() {
return null;
}

/**
* @return The {@link io.micronaut.scheduling.executor.ExecutorType}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package io.micronaut.scheduling.executor;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.core.reflect.InstantiationUtils;
import io.micronaut.inject.qualifiers.Qualifiers;

import javax.inject.Inject;
import java.util.concurrent.*;

/**
Expand All @@ -31,14 +34,40 @@
@Factory
public class ExecutorFactory {

private final BeanLocator beanLocator;
private final ThreadFactory threadFactory;

/**
*
* @param threadFactory The factory to create new threads
* @deprecated Use {@link #ExecutorFactory(BeanLocator, ThreadFactory)} instead
*/
@Deprecated
public ExecutorFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.beanLocator = null;
}

/**
*
* @param beanLocator The bean beanLocator
* @param threadFactory The factory to create new threads
* @since 2.0.1
*/
@Inject
public ExecutorFactory(BeanLocator beanLocator, ThreadFactory threadFactory) {
this.beanLocator = beanLocator;
this.threadFactory = threadFactory;
}

/**
* Constructs an executor thread factory.
*
* @param configuration The configuration
* @return The thread factory
*/
@EachBean(ExecutorConfiguration.class)
protected ThreadFactory eventLoopGroupThreadFactory(ExecutorConfiguration configuration) {
return configuration.getName() == null ? threadFactory : new NamedThreadFactory(configuration.getName() + "-executor");
}

/**
Expand All @@ -53,31 +82,34 @@ public ExecutorService executorService(ExecutorConfiguration executorConfigurati
ExecutorType executorType = executorConfiguration.getType();
switch (executorType) {
case FIXED:
return executorConfiguration
.getThreadFactoryClass()
.flatMap(InstantiationUtils::tryInstantiate)
.map(factory -> Executors.newFixedThreadPool(executorConfiguration.getNumberOfThreads(), factory))
.orElse(Executors.newFixedThreadPool(executorConfiguration.getNumberOfThreads(), threadFactory));

return Executors.newFixedThreadPool(executorConfiguration.getNumberOfThreads(), getThreadFactory(executorConfiguration));
case CACHED:
return executorConfiguration
.getThreadFactoryClass()
.flatMap(InstantiationUtils::tryInstantiate)
.map(Executors::newCachedThreadPool)
.orElse(Executors.newCachedThreadPool(threadFactory));

return Executors.newCachedThreadPool(getThreadFactory(executorConfiguration));
case SCHEDULED:
return executorConfiguration
.getThreadFactoryClass()
.flatMap(InstantiationUtils::tryInstantiate)
.map(factory -> Executors.newScheduledThreadPool(executorConfiguration.getCorePoolSize(), factory))
.orElse(Executors.newScheduledThreadPool(executorConfiguration.getCorePoolSize(), threadFactory));

return Executors.newScheduledThreadPool(executorConfiguration.getCorePoolSize(), getThreadFactory(executorConfiguration));
case WORK_STEALING:
return Executors.newWorkStealingPool(executorConfiguration.getParallelism());

default:
throw new IllegalStateException("Could not create Executor service for enum value: " + executorType);
}
}

private ThreadFactory getThreadFactory(ExecutorConfiguration executorConfiguration) {
return executorConfiguration
.getThreadFactoryClass()
.flatMap(InstantiationUtils::tryInstantiate)
.map(tf -> (ThreadFactory) tf)
.orElseGet(() -> {
if (beanLocator != null) {
if (executorConfiguration.getName() == null) {
return beanLocator.getBean(ThreadFactory.class);
}
return beanLocator.getBean(ThreadFactory.class, Qualifiers.byName(executorConfiguration.getName()));
} else {
throw new IllegalStateException("No bean factory configured");
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public class IOExecutorServiceConfig {
@Singleton
@Named(TaskExecutors.IO)
ExecutorConfiguration configuration() {
return UserExecutorConfiguration.of(ExecutorType.CACHED);
return UserExecutorConfiguration.of(TaskExecutors.IO, ExecutorType.CACHED);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.scheduling.executor;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.util.ArgumentUtils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Creates new named threads.
*
* @author Denis Stepanov
* @since 2.0.1
*/
@Internal
class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

/**
* The constructor.
*
* @param name new thread's prefix
*/
NamedThreadFactory(String name) {
ArgumentUtils.check("name", name).notNull();
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-thread-";
}

/**
* Constructs a new {@code Thread}.
*
* @param runnable The Runnable
* @return new thread
*/
@Override
public Thread newThread(Runnable runnable) {
Thread newThread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
if (newThread.isDaemon()) {
newThread.setDaemon(false);
}
if (newThread.getPriority() != Thread.NORM_PRIORITY) {
newThread.setPriority(Thread.NORM_PRIORITY);
}
return newThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public class ScheduledExecutorServiceConfig {
@Singleton
@Named(TaskExecutors.SCHEDULED)
ExecutorConfiguration configuration() {
return UserExecutorConfiguration.of(ExecutorType.SCHEDULED);
return UserExecutorConfiguration.of(TaskExecutors.SCHEDULED, ExecutorType.SCHEDULED);
}
}
Loading

0 comments on commit 2d7984f

Please sign in to comment.