Skip to content

Commit

Permalink
refactor(core): 重构线程池自动配置
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles7c committed Jun 25, 2024
1 parent a89765f commit de056aa
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package top.continew.starter.core.autoconfigure.threadpool;

import cn.hutool.core.util.ArrayUtil;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
Expand All @@ -25,40 +26,39 @@
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import top.continew.starter.core.constant.PropertiesConstants;
import top.continew.starter.core.exception.BaseException;

import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
* 异步任务自动配置
*
* @author Charles7c
* @author Lion Li(<a href="https://gitee.com/dromara/RuoYi-Vue-Plus">RuoYi-Vue-Plus</a>)
* @since 1.0.0
*/
@Lazy
@AutoConfiguration
@EnableAsync(proxyTargetClass = true)
@ConditionalOnProperty(prefix = PropertiesConstants.THREAD_POOL, name = PropertiesConstants.ENABLED, havingValue = "true")
@ConditionalOnProperty(prefix = "spring.task.execution.extension", name = PropertiesConstants.ENABLED, matchIfMissing = true)
public class AsyncAutoConfiguration implements AsyncConfigurer {

private static final Logger log = LoggerFactory.getLogger(AsyncAutoConfiguration.class);

private final ScheduledExecutorService scheduledExecutorService;
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

public AsyncAutoConfiguration(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
public AsyncAutoConfiguration(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
this.threadPoolTaskExecutor = threadPoolTaskExecutor;
}

/**
* 异步任务 @Async 执行时,使用 Java 内置线程池
* 异步任务线程池配置
*/
@Override
public Executor getAsyncExecutor() {
log.debug("[ContiNew Starter] - Auto Configuration 'AsyncConfigurer' completed initialization.");
return scheduledExecutorService;
return threadPoolTaskExecutor;
}

/**
Expand All @@ -79,4 +79,9 @@ public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
throw new BaseException(sb.toString());
};
}

@PostConstruct
public void postConstruct() {
log.debug("[ContiNew Starter] - Auto Configuration 'AsyncConfigurer' completed initialization.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,148 +16,70 @@

package top.continew.starter.core.autoconfigure.threadpool;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.task.TaskExecutorCustomizer;
import org.springframework.boot.task.TaskSchedulerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import top.continew.starter.core.constant.PropertiesConstants;
import top.continew.starter.core.util.ExceptionUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 线程池自动配置
*
* @author Charles7c
* @author Lion Li(<a href="https://gitee.com/dromara/RuoYi-Vue-Plus">RuoYi-Vue-Plus</a>)
* @since 1.0.0
*/
@Lazy
@AutoConfiguration
@ConditionalOnProperty(prefix = PropertiesConstants.THREAD_POOL, name = PropertiesConstants.ENABLED, havingValue = "true")
@EnableConfigurationProperties(ThreadPoolProperties.class)
@EnableConfigurationProperties(ThreadPoolExtensionProperties.class)
public class ThreadPoolAutoConfiguration {

private static final Logger log = LoggerFactory.getLogger(ThreadPoolAutoConfiguration.class);

/**
* 核心(最小)线程数 = CPU 核心数 + 1
*/
/** 核心(最小)线程数 = CPU 核心数 + 1 */
private final int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;

/**
* Spring 内置线程池:ThreadPoolTaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor(ThreadPoolProperties properties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("thread-pool");
// 核心(最小)线程数
executor.setCorePoolSize(ObjectUtil.defaultIfNull(properties.getCorePoolSize(), corePoolSize));
// 最大线程数
executor.setMaxPoolSize(ObjectUtil.defaultIfNull(properties.getMaxPoolSize(), corePoolSize * 2));
// 队列容量
executor.setQueueCapacity(properties.getQueueCapacity());
// 活跃时间
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
// 配置当池内线程数已达到上限的时候,该如何处理新任务:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 关闭线程池是否等待任务完成
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutdown());
// 执行器在关闭时阻塞的最长毫秒数,以等待剩余任务完成执行
executor.setAwaitTerminationMillis(properties.getAwaitTerminationMillis());
log.debug("[ContiNew Starter] - Auto Configuration 'ThreadPoolTaskExecutor' completed initialization.");
return executor;
}

/**
* Java 内置线程池:ScheduledExecutorService(适用于执行周期性或定时任务)
* 异步任务线程池配置
*/
@Bean
@ConditionalOnMissingBean
public ScheduledExecutorService scheduledExecutorService(ThreadPoolProperties properties) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(ObjectUtil.defaultIfNull(properties
.getCorePoolSize(), corePoolSize), ThreadUtil
.newNamedThreadFactory("schedule-pool-%d", true), new ThreadPoolExecutor.CallerRunsPolicy()) {
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
ExceptionUtils.printException(runnable, throwable);
@ConditionalOnProperty(prefix = "spring.task.execution.extension", name = PropertiesConstants.ENABLED, matchIfMissing = true)
public TaskExecutorCustomizer taskExecutorCustomizer(ThreadPoolExtensionProperties properties) {
return executor -> {
if (executor.getMaxPoolSize() == Integer.MAX_VALUE) {
// 核心(最小)线程数
executor.setCorePoolSize(corePoolSize);
// 最大线程数
executor.setMaxPoolSize(corePoolSize * 2);
// 队列容量
executor.setQueueCapacity(executor.getMaxPoolSize());
}
// 当线程池的任务缓存队列已满并且线程池中的线程数已达到 maxPoolSize 时采取的任务拒绝策略
executor.setRejectedExecutionHandler(properties.getExecution()
.getRejectedPolicy()
.getRejectedExecutionHandler());
log.debug("[ContiNew Starter] - Auto Configuration 'TaskExecutor' completed initialization.");
};
// 应用关闭时,关闭线程池
SpringApplication.getShutdownHandlers().add(() -> this.shutdown(executor, properties));
log.debug("[ContiNew Starter] - Auto Configuration 'ScheduledExecutorService' completed initialization.");
return executor;
}

/**
* 根据相应的配置设置关闭 ExecutorService
*
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#shutdown()
* @since 2.0.0
*/
public void shutdown(ExecutorService executor, ThreadPoolProperties properties) {
log.debug("[ContiNew Starter] - Shutting down ScheduledExecutorService start.");
if (executor != null) {
if (properties.isWaitForTasksToCompleteOnShutdown()) {
executor.shutdown();
} else {
for (Runnable remainingTask : executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(executor, properties);
log.debug("[ContiNew Starter] - Shutting down ScheduledExecutorService complete.");
}
}

/**
* Cancel the given remaining task which never commenced execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see RunnableFuture#cancel(boolean)
* @since 2.0.0
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future<?> future) {
future.cancel(true);
}
}

/**
* Wait for the executor to terminate, according to the value of the properties
*
* @since 2.0.0
* 调度任务线程池配置
*/
private void awaitTerminationIfNecessary(ExecutorService executor, ThreadPoolProperties properties) {
if (properties.getAwaitTerminationMillis() > 0) {
try {
if (!executor.awaitTermination(properties.getAwaitTerminationMillis(), TimeUnit.MILLISECONDS)) {
if (log.isWarnEnabled()) {
log.warn("[ContiNew Starter] - Timed out while waiting for executor 'ScheduledExecutorService' to terminate.");
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("[ContiNew Starter] - Interrupted while waiting for executor 'ScheduledExecutorService' to terminate");
}
Thread.currentThread().interrupt();
@Bean
@ConditionalOnProperty(prefix = "spring.task.scheduling.extension", name = PropertiesConstants.ENABLED, matchIfMissing = true)
public TaskSchedulerCustomizer taskSchedulerCustomizer(ThreadPoolExtensionProperties properties) {
return executor -> {
if (executor.getPoolSize() <= 1) {
executor.setPoolSize(corePoolSize);
}
}
executor.setRejectedExecutionHandler(properties.getScheduling()
.getRejectedPolicy()
.getRejectedExecutionHandler());
log.debug("[ContiNew Starter] - Auto Configuration 'TaskScheduler' completed initialization.");
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package top.continew.starter.core.autoconfigure.threadpool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池拒绝策略
*
* @author Charles7c
* @since 2.2.0
*/
public enum ThreadPoolExecutorRejectedPolicy {

/**
* ThreadPoolTaskExecutor 默认的拒绝策略,不执行新任务,直接抛出 RejectedExecutionException 异常
*/
ABORT {
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return new ThreadPoolExecutor.AbortPolicy();
}
},

/**
* 提交的任务在执行被拒绝时,会由提交任务的线程去执行
*/
CALLER_RUNS {
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return new ThreadPoolExecutor.CallerRunsPolicy();
}
},

/**
* 不执行新任务,也不抛出异常
*/
DISCARD {
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return new ThreadPoolExecutor.DiscardPolicy();
}
},

/**
* 拒绝新任务,但是会抛弃队列中最老的任务,然后尝试再次提交新任务
*/
DISCARD_OLDEST {
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return new ThreadPoolExecutor.DiscardOldestPolicy();
}
};

/**
* 获取拒绝处理器
*
* @return 拒绝处理器
*/
public abstract RejectedExecutionHandler getRejectedExecutionHandler();
}
Loading

0 comments on commit de056aa

Please sign in to comment.