Skip to content

Commit

Permalink
Merge pull request #3242 from KouShenhai/dev
Browse files Browse the repository at this point in the history
feat: 优化代码[修改虚拟线程乱用或不合理的地方]
  • Loading branch information
KouShenhai authored Jan 1, 2025
2 parents 103adaa + e592aff commit 5b59fb0
Show file tree
Hide file tree
Showing 66 changed files with 224 additions and 713 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class FileUtil {

private static final String RW = "rw";

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();

/**
* 创建目录及文件.
* @param directory 目录
Expand Down Expand Up @@ -104,8 +106,7 @@ public static void write(File file, InputStream in, long size, long chunkSize) t
// 最大偏移量2G【2^31】数据
chunkSize = Math.min(chunkSize, 2L * 1024 * 1024 * 1024);
long chunkCount = (size / chunkSize) + (size % chunkSize == 0 ? 0 : 1);
try (FileChannel inChannel = fis.getChannel();
ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try (FileChannel inChannel = fis.getChannel()) {
List<Callable<Boolean>> futures = new ArrayList<>((int) chunkCount);
// position指针
for (long index = 0, position = 0,
Expand All @@ -132,7 +133,7 @@ public static void write(File file, InputStream in, long size, long chunkSize) t
return true;
});
}
executor.invokeAll(futures);
EXECUTOR.invokeAll(futures);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.ibatis.session.ResultHandler;
import org.laokou.common.i18n.dto.PageQuery;

import java.util.List;

import static org.laokou.common.i18n.dto.PageQuery.PAGE_QUERY;

/**
Expand All @@ -37,4 +39,6 @@ public interface CrudMapper<ID, VERSION, DO> extends BaseMapper<DO> {

long selectObjectCount(@Param(PAGE_QUERY) PageQuery pageQuery);

List<DO> selectObjectPage(@Param("pageQuery") PageQuery pageQuery);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class MybatisUtil {

private static final int DEFAULT_BATCH_NUM = 100000;

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();

private final SqlSessionFactory sqlSessionFactory;

public <T, M> void batch(List<T> dataList, int batchNum, int timeout, Class<M> clazz, BiConsumer<M, T> consumer) {
Expand All @@ -76,29 +78,27 @@ public <T, M> void batch(List<T> dataList, Class<M> clazz, String ds, BiConsumer
*/
public <T, M> void batch(List<T> dataList, int batchNum, int timeout, Class<M> clazz, String ds,
BiConsumer<M, T> consumer) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
// 虚拟线程
List<Callable<Boolean>> futures = partition.stream().map(item -> (Callable<Boolean>) () -> {
handleBatch(timeout, item, clazz, consumer, rollback, ds, cyclicBarrier);
return true;
}).toList();
// 执行任务
executor.invokeAll(futures);
if (rollback.get()) {
throw new SystemException("S_DS_TransactionRolledBack", "事务已回滚");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage(), e);
try {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
// 虚拟线程
List<Callable<Boolean>> futures = partition.stream().map(item -> (Callable<Boolean>) () -> {
handleBatch(timeout, item, clazz, consumer, rollback, ds, cyclicBarrier);
return true;
}).toList();
// 执行任务
EXECUTOR.invokeAll(futures);
if (rollback.get()) {
throw new SystemException("S_DS_TransactionRolledBack", "事务已回滚");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage(), e);
}
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.laokou.common.i18n.dto.PageQuery;
import org.laokou.common.mybatisplus.mapper.CrudMapper;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 数据源数据库映射.
*
Expand All @@ -34,10 +31,6 @@
@Repository
public interface SourceMapper extends CrudMapper<Long, Integer, SourceDO> {

List<SourceDO> selectPageByCondition(@Param("pageQuery") PageQuery pageQuery);

long selectCountByCondition(@Param("pageQuery") PageQuery pageQuery);

SourceDO selectOneByTenantCode(@Param("tenantCode") String tenantCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.laokou.common.i18n.dto.PageQuery;
import org.laokou.common.mybatisplus.mapper.CrudMapper;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 租户数据库映射.
*
Expand All @@ -34,10 +31,6 @@
@Repository
public interface TenantMapper extends CrudMapper<Long, Integer, TenantDO> {

List<TenantDO> selectPageByCondition(@Param("pageQuery") PageQuery pageQuery);

long selectCountByCondition(@Param("pageQuery") PageQuery pageQuery);

Long selectIdTenantCode(@Param("tenantCode") String tenantCode);
Long selectIdByCode(@Param("code") String code);

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
where id = #{id}
</select>

<select id="selectIdTenantCode" resultType="java.lang.Long">
<select id="selectIdByCode" resultType="java.lang.Long">
<choose>
<when test="tenantCode == 'laokou'">
<when test="code == 'laokou'">
select 0
</when>
<otherwise>
select id
from boot_sys_tenant
where code = #{tenantCode}
where code = #{code}
and del_flag = 0
</otherwise>
</choose>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
@RocketMQMessageListener(consumerGroup = LAOKOU_MESSAGE_CONSUMER_GROUP, topic = LAOKOU_MESSAGE_TOPIC, messageModel = BROADCASTING, consumeMode = CONCURRENTLY)
public class SubscribeMessageConsumer implements RocketMQListener<MessageExt> {

private static final ExecutorService EXECUTOR = ThreadUtil.newVirtualTaskExecutor();

private final Server webSocketServer;

public SubscribeMessageConsumer(Server webSocketServer) {
Expand All @@ -56,21 +58,19 @@ public SubscribeMessageConsumer(Server webSocketServer) {

@Override
public void onMessage(MessageExt message) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
PayloadCO co = JacksonUtil.toBean(msg, PayloadCO.class);
TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(co.getContent());
List<Callable<Boolean>> callableList = co.getReceivers().stream().map(clientId -> (Callable<Boolean>) () -> {
webSocketServer.send(clientId, webSocketFrame);
return true;
}).toList();
executor.invokeAll(callableList);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage());
}
try {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
PayloadCO co = JacksonUtil.toBean(msg, PayloadCO.class);
TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(co.getContent());
List<Callable<Boolean>> callableList = co.getReceivers().stream().map(clientId -> (Callable<Boolean>) () -> {
webSocketServer.send(clientId, webSocketFrame);
return true;
}).toList();
EXECUTOR.invokeAll(callableList);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误信息:{}", e.getMessage());
throw new SystemException("S_UnKnow_Error", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.cluster.dto.clientobject.ClusterCO;
import org.laokou.admin.cluster.gatewayimpl.database.ClusterMapper;
import org.laokou.admin.cluster.gatewayimpl.database.dataobject.ClusterDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询集群请求执行器.
Expand All @@ -46,18 +41,9 @@ public class ClusterPageQryExe {
private final ClusterMapper clusterMapper;

public Result<Page<ClusterCO>> execute(ClusterPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<ClusterDO>> c1 = CompletableFuture
.supplyAsync(() -> clusterMapper.selectObjectPage(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> clusterMapper.selectObjectCount(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(ClusterConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Cluster_PageQueryTimeout", "集群分页查询超时");
}
List<ClusterDO> list = clusterMapper.selectObjectPage(qry);
long total = clusterMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(ClusterConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dept.dto.clientobject.DeptCO;
import org.laokou.admin.dept.gatewayimpl.database.DeptMapper;
import org.laokou.admin.dept.gatewayimpl.database.dataobject.DeptDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询部门请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DeptPageQryExe {
private final DeptMapper deptMapper;

public Result<Page<DeptCO>> execute(DeptPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DeptDO>> c1 = CompletableFuture
.supplyAsync(() -> deptMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> deptMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DeptConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Dept_PageQueryTimeout", "部门分页查询超时");
}
List<DeptDO> list = deptMapper.selectObjectPage(qry);
long total = deptMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DeptConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dict.dto.clientobject.DictCO;
import org.laokou.admin.dict.gatewayimpl.database.DictMapper;
import org.laokou.admin.dict.gatewayimpl.database.dataobject.DictDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询字典请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DictPageQryExe {
private final DictMapper dictMapper;

public Result<Page<DictCO>> execute(DictPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DictDO>> c1 = CompletableFuture
.supplyAsync(() -> dictMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> dictMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DictConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_Dict_PageQueryTimeout", "字典分页查询超时");
}
List<DictDO> list = dictMapper.selectObjectPage(qry);
long total = dictMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DictConvertor::toClientObject).toList(), total));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.laokou.admin.dictItem.dto.clientobject.DictItemCO;
import org.laokou.admin.dictItem.gatewayimpl.database.DictItemMapper;
import org.laokou.admin.dictItem.gatewayimpl.database.dataobject.DictItemDO;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.i18n.common.exception.SystemException;
import org.laokou.common.i18n.dto.Page;
import org.laokou.common.i18n.dto.Result;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 分页查询字典项请求执行器.
Expand All @@ -46,18 +41,9 @@ public class DictItemPageQryExe {
private final DictItemMapper dictItemMapper;

public Result<Page<DictItemCO>> execute(DictItemPageQry qry) {
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
CompletableFuture<List<DictItemDO>> c1 = CompletableFuture
.supplyAsync(() -> dictItemMapper.selectPageByCondition(qry), executor);
CompletableFuture<Long> c2 = CompletableFuture.supplyAsync(() -> dictItemMapper.selectCountByCondition(qry),
executor);
return Result
.ok(Page.create(c1.get(30, TimeUnit.SECONDS).stream().map(DictItemConvertor::toClientObject).toList(),
c2.get(30, TimeUnit.SECONDS)));
}
catch (Exception e) {
throw new SystemException("S_DictItem_PageQueryTimeout", "字典项分页查询超时");
}
List<DictItemDO> list = dictItemMapper.selectObjectPage(qry);
long total = dictItemMapper.selectObjectCount(qry);
return Result.ok(Page.create(list.stream().map(DictItemConvertor::toClientObject).toList(), total));
}

}
Loading

0 comments on commit 5b59fb0

Please sign in to comment.