Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 优化代码[修改虚拟线程乱用或不合理的地方] #3242

Merged
merged 1 commit into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class FileUtil {

private static final String RW = "rw";

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();

/**
* 创建目录及文件.
* @param directory 目录
Expand Down Expand Up @@ -104,8 +106,7 @@ public static void write(File file, InputStream in, long size, long chunkSize) t
// 最大偏移量2G【2^31】数据
chunkSize = Math.min(chunkSize, 2L * 1024 * 1024 * 1024);
long chunkCount = (size / chunkSize) + (size % chunkSize == 0 ? 0 : 1);
try (FileChannel inChannel = fis.getChannel();
ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try (FileChannel inChannel = fis.getChannel()) {
List<Callable<Boolean>> futures = new ArrayList<>((int) chunkCount);
// position指针
for (long index = 0, position = 0,
Expand All @@ -132,7 +133,7 @@ public static void write(File file, InputStream in, long size, long chunkSize) t
return true;
});
}
executor.invokeAll(futures);
EXECUTOR.invokeAll(futures);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.ibatis.session.ResultHandler;
import org.laokou.common.i18n.dto.PageQuery;

import java.util.List;

import static org.laokou.common.i18n.dto.PageQuery.PAGE_QUERY;

/**
Expand All @@ -37,4 +39,6 @@ public interface CrudMapper<ID, VERSION, DO> extends BaseMapper<DO> {

long selectObjectCount(@Param(PAGE_QUERY) PageQuery pageQuery);

List<DO> selectObjectPage(@Param("pageQuery") PageQuery pageQuery);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class MybatisUtil {

private static final int DEFAULT_BATCH_NUM = 100000;

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议(性能): 考虑添加适当的执行器关闭处理以防止资源泄漏

静态执行器需要仔细的生命周期管理。考虑实现关闭钩子或应用程序生命周期监听器,以确保执行器服务的正确清理。

Original comment in English

suggestion (performance): Consider adding proper executor shutdown handling to prevent resource leaks

Static executors need careful lifecycle management. Consider implementing a shutdown hook or application lifecycle listener to ensure proper cleanup of the executor service.


private final SqlSessionFactory sqlSessionFactory;

public <T, M> void batch(List<T> dataList, int batchNum, int timeout, Class<M> clazz, BiConsumer<M, T> consumer) {
Expand All @@ -76,29 +78,27 @@ public <T, M> void batch(List<T> dataList, Class<M> clazz, String ds, BiConsumer
*/
public <T, M> void batch(List<T> dataList, int batchNum, int timeout, Class<M> clazz, String ds,
BiConsumer<M, T> consumer) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
// 虚拟线程
List<Callable<Boolean>> futures = partition.stream().map(item -> (Callable<Boolean>) () -> {
handleBatch(timeout, item, clazz, consumer, rollback, ds, cyclicBarrier);
return true;
}).toList();
// 执行任务
executor.invokeAll(futures);
if (rollback.get()) {
throw new SystemException("S_DS_TransactionRolledBack", "事务已回滚");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage(), e);
try {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
// 虚拟线程
List<Callable<Boolean>> futures = partition.stream().map(item -> (Callable<Boolean>) () -> {
handleBatch(timeout, item, clazz, consumer, rollback, ds, cyclicBarrier);
return true;
}).toList();
// 执行任务
EXECUTOR.invokeAll(futures);
if (rollback.get()) {
throw new SystemException("S_DS_TransactionRolledBack", "事务已回滚");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage(), e);
}
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.laokou.common.i18n.dto.PageQuery;
import org.laokou.common.mybatisplus.mapper.CrudMapper;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 数据源数据库映射.
*
Expand All @@ -34,10 +31,6 @@
@Repository
public interface SourceMapper extends CrudMapper<Long, Integer, SourceDO> {

List<SourceDO> selectPageByCondition(@Param("pageQuery") PageQuery pageQuery);

long selectCountByCondition(@Param("pageQuery") PageQuery pageQuery);

SourceDO selectOneByTenantCode(@Param("tenantCode") String tenantCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.laokou.common.i18n.dto.PageQuery;
import org.laokou.common.mybatisplus.mapper.CrudMapper;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 租户数据库映射.
*
Expand All @@ -34,10 +31,6 @@
@Repository
public interface TenantMapper extends CrudMapper<Long, Integer, TenantDO> {

List<TenantDO> selectPageByCondition(@Param("pageQuery") PageQuery pageQuery);

long selectCountByCondition(@Param("pageQuery") PageQuery pageQuery);

Long selectIdTenantCode(@Param("tenantCode") String tenantCode);
Long selectIdByCode(@Param("code") String code);

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
where id = #{id}
</select>

<select id="selectIdTenantCode" resultType="java.lang.Long">
<select id="selectIdByCode" resultType="java.lang.Long">
<choose>
<when test="tenantCode == 'laokou'">
<when test="code == 'laokou'">
select 0
</when>
<otherwise>
select id
from boot_sys_tenant
where code = #{tenantCode}
where code = #{code}
and del_flag = 0
</otherwise>
</choose>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
@RocketMQMessageListener(consumerGroup = LAOKOU_MESSAGE_CONSUMER_GROUP, topic = LAOKOU_MESSAGE_TOPIC, messageModel = BROADCASTING, consumeMode = CONCURRENTLY)
public class SubscribeMessageConsumer implements RocketMQListener<MessageExt> {

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();

private final Server webSocketServer;

public SubscribeMessageConsumer(Server webSocketServer) {
Expand All @@ -56,21 +58,19 @@ public SubscribeMessageConsumer(Server webSocketServer) {

@Override
public void onMessage(MessageExt message) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
PayloadCO co = JacksonUtil.toBean(msg, PayloadCO.class);
TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(co.getContent());
List<Callable<Boolean>> callableList = co.getReceivers().stream().map(clientId -> (Callable<Boolean>) () -> {
webSocketServer.send(clientId, webSocketFrame);
return true;
}).toList();
executor.invokeAll(callableList);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage());
}
try {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
PayloadCO co = JacksonUtil.toBean(msg, PayloadCO.class);
TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(co.getContent());
List<Callable<Boolean>> callableList = co.getReceivers().stream().map(clientId -> (Callable<Boolean>) () -> {
webSocketServer.send(clientId, webSocketFrame);
return true;
}).toList();
EXECUTOR.invokeAll(callableList);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.cluster.dto.clientobject.ClusterCO;
import org.laokou.admin.cluster.gatewayimpl.database.ClusterMapper;
import org.laokou.admin.cluster.gatewayimpl.database.dataobject.ClusterDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询集群请求执行器.
Expand All @@ -46,18 +41,9 @@ public class ClusterPageQryExe {
private final ClusterMapper clusterMapper;

public Result<Page<ClusterCO>> execute(ClusterPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<ClusterDO>> c1 = CompletableFuture
.supplyAsync(() -> clusterMapper.selectObjectPage(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> clusterMapper.selectObjectCount(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(ClusterConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Cluster_PageQueryTimeout", "集群分页查询超时");
}
List<ClusterDO> list = clusterMapper.selectObjectPage(qry);
long total = clusterMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(ClusterConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dept.dto.clientobject.DeptCO;
import org.laokou.admin.dept.gatewayimpl.database.DeptMapper;
import org.laokou.admin.dept.gatewayimpl.database.dataobject.DeptDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询部门请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DeptPageQryExe {
private final DeptMapper deptMapper;

public Result<Page<DeptCO>> execute(DeptPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DeptDO>> c1 = CompletableFuture
.supplyAsync(() -> deptMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> deptMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DeptConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Dept_PageQueryTimeout", "部门分页查询超时");
}
List<DeptDO> list = deptMapper.selectObjectPage(qry);
long total = deptMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DeptConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dict.dto.clientobject.DictCO;
import org.laokou.admin.dict.gatewayimpl.database.DictMapper;
import org.laokou.admin.dict.gatewayimpl.database.dataobject.DictDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询字典请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DictPageQryExe {
private final DictMapper dictMapper;

public Result<Page<DictCO>> execute(DictPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DictDO>> c1 = CompletableFuture
.supplyAsync(() -> dictMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> dictMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DictConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Dict_PageQueryTimeout", "字典分页查询超时");
}
List<DictDO> list = dictMapper.selectObjectPage(qry);
long total = dictMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DictConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dictItem.dto.clientobject.DictItemCO;
import org.laokou.admin.dictItem.gatewayimpl.database.DictItemMapper;
import org.laokou.admin.dictItem.gatewayimpl.database.dataobject.DictItemDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询字典项请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DictItemPageQryExe {
private final DictItemMapper dictItemMapper;

public Result<Page<DictItemCO>> execute(DictItemPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DictItemDO>> c1 = CompletableFuture
.supplyAsync(() -> dictItemMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> dictItemMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DictItemConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_DictItem_PageQueryTimeout", "字典项分页查询超时");
}
List<DictItemDO> list = dictItemMapper.selectObjectPage(qry);
long total = dictItemMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DictItemConvertor::toClientObject).toList(), total));
}

}
Loading
Loading