From ffa17ad98a3ee1c487bbda11b2d6caba2eee2b12 Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Sun, 29 May 2022 21:49:05 +0800 Subject: [PATCH] [Feature-545][admin] Add Task Pool to solve frequent database writes --- .../controller/JobInstanceController.java | 23 ++++-- .../main/java/com/dlink/job/FlinkJobTask.java | 17 ++-- .../java/com/dlink/job/FlinkJobTaskPool.java | 33 ++++++++ .../java/com/dlink/model/JobInfoDetail.java | 13 ++++ .../com/dlink/service/JobHistoryService.java | 2 +- .../com/dlink/service/JobInstanceService.java | 8 +- .../service/impl/JobHistoryServiceImpl.java | 15 ++-- .../service/impl/JobInstanceServiceImpl.java | 78 +++++++++++++++---- .../dlink/service/impl/TaskServiceImpl.java | 62 +++++++++++---- .../java/com/dlink/pool/AbstractPool.java | 38 +++++++++ 10 files changed, 233 insertions(+), 56 deletions(-) create mode 100644 dlink-admin/src/main/java/com/dlink/job/FlinkJobTaskPool.java create mode 100644 dlink-common/src/main/java/com/dlink/pool/AbstractPool.java diff --git a/dlink-admin/src/main/java/com/dlink/controller/JobInstanceController.java b/dlink-admin/src/main/java/com/dlink/controller/JobInstanceController.java index 7c884d4069..9ad23d5ae1 100644 --- a/dlink-admin/src/main/java/com/dlink/controller/JobInstanceController.java +++ b/dlink-admin/src/main/java/com/dlink/controller/JobInstanceController.java @@ -1,19 +1,26 @@ package com.dlink.controller; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + import com.dlink.common.result.ProTableResult; import com.dlink.common.result.Result; -import com.dlink.model.Jar; import com.dlink.model.JobInstance; import com.dlink.service.JobInstanceService; import com.dlink.service.TaskService; import com.fasterxml.jackson.databind.JsonNode; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import lombok.extern.slf4j.Slf4j; /** * JobInstanceController @@ -35,7 +42,7 @@ public class JobInstanceController { */ @PostMapping public ProTableResult listJobInstances(@RequestBody JsonNode para) { - return jobInstanceService.selectForProTable(para); + return jobInstanceService.listJobInstances(para); } /** diff --git a/dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java b/dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java index b298d52ef8..99fa1afc49 100644 --- a/dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java +++ b/dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java @@ -1,5 +1,12 @@ package com.dlink.job; +import java.time.Duration; +import java.time.LocalDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.DependsOn; + import com.dlink.assertion.Asserts; import com.dlink.context.SpringContextUtils; import com.dlink.daemon.constant.FlinkTaskConstant; @@ -9,12 +16,6 @@ import com.dlink.model.JobInstance; import com.dlink.model.JobStatus; import com.dlink.service.TaskService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.DependsOn; - -import java.time.Duration; -import java.time.LocalDateTime; @DependsOn("springContextUtils") public class FlinkJobTask implements DaemonTask { @@ -54,8 +55,10 @@ public void dealTask() { preDealTime = System.currentTimeMillis(); JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false); if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime()) - && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) { + && Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) { DefaultThreadPool.getInstance().execute(this); + } else { + FlinkJobTaskPool.getInstance().remove(config.getId().toString()); } } } diff --git a/dlink-admin/src/main/java/com/dlink/job/FlinkJobTaskPool.java b/dlink-admin/src/main/java/com/dlink/job/FlinkJobTaskPool.java new file mode 100644 index 0000000000..a9422a2514 --- /dev/null +++ b/dlink-admin/src/main/java/com/dlink/job/FlinkJobTaskPool.java @@ -0,0 +1,33 @@ +package com.dlink.job; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.dlink.model.JobInfoDetail; +import com.dlink.pool.AbstractPool; + +/** + * FlinkJobTaskPool + * + * @author wenmo + * @since 2022/5/28 16:39 + */ +public class FlinkJobTaskPool extends AbstractPool { + + private static volatile Map flinkJobTaskEntityMap = new ConcurrentHashMap<>(); + + private static FlinkJobTaskPool instance = new FlinkJobTaskPool(); + + public static FlinkJobTaskPool getInstance(){ + return instance; + } + + @Override + public Map getMap() { + return flinkJobTaskEntityMap; + } + + public void refresh(JobInfoDetail entity) { + entity.refresh(); + } +} diff --git a/dlink-admin/src/main/java/com/dlink/model/JobInfoDetail.java b/dlink-admin/src/main/java/com/dlink/model/JobInfoDetail.java index 7a99f9c421..a7e549a4ea 100644 --- a/dlink-admin/src/main/java/com/dlink/model/JobInfoDetail.java +++ b/dlink-admin/src/main/java/com/dlink/model/JobInfoDetail.java @@ -14,9 +14,11 @@ public class JobInfoDetail { private ClusterConfiguration clusterConfiguration; private History history; private JobHistory jobHistory; + private Integer refreshCount; public JobInfoDetail(Integer id) { this.id = id; + this.refreshCount = 0; } public Integer getId() { @@ -66,4 +68,15 @@ public JobHistory getJobHistory() { public void setJobHistory(JobHistory jobHistory) { this.jobHistory = jobHistory; } + + public void refresh() { + refreshCount = refreshCount + 1; + if (isNeedSave()) { + refreshCount = 0; + } + } + + public boolean isNeedSave() { + return refreshCount % 60 == 0; + } } diff --git a/dlink-admin/src/main/java/com/dlink/service/JobHistoryService.java b/dlink-admin/src/main/java/com/dlink/service/JobHistoryService.java index 3d843de6f4..759c9bf7b1 100644 --- a/dlink-admin/src/main/java/com/dlink/service/JobHistoryService.java +++ b/dlink-admin/src/main/java/com/dlink/service/JobHistoryService.java @@ -15,5 +15,5 @@ public interface JobHistoryService extends ISuperService { JobHistory getJobHistoryInfo(JobHistory jobHistory); - JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId); + JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave); } diff --git a/dlink-admin/src/main/java/com/dlink/service/JobInstanceService.java b/dlink-admin/src/main/java/com/dlink/service/JobInstanceService.java index d89393c98c..0def77942e 100644 --- a/dlink-admin/src/main/java/com/dlink/service/JobInstanceService.java +++ b/dlink-admin/src/main/java/com/dlink/service/JobInstanceService.java @@ -1,12 +1,14 @@ package com.dlink.service; +import java.util.List; + +import com.dlink.common.result.ProTableResult; import com.dlink.db.service.ISuperService; import com.dlink.explainer.lineage.LineageResult; import com.dlink.model.JobInfoDetail; import com.dlink.model.JobInstance; import com.dlink.model.JobInstanceStatus; - -import java.util.List; +import com.fasterxml.jackson.databind.JsonNode; /** * JobInstanceService @@ -27,4 +29,6 @@ public interface JobInstanceService extends ISuperService { LineageResult getLineage(Integer id); JobInstance getJobInstanceByTaskId(Integer id); + + ProTableResult listJobInstances(JsonNode para); } diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java index 08fc5cecdb..f5e4e76edd 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java @@ -1,5 +1,7 @@ package com.dlink.service.impl; +import org.springframework.stereotype.Service; + import com.dlink.api.FlinkAPI; import com.dlink.assertion.Asserts; import com.dlink.db.service.impl.SuperServiceImpl; @@ -8,7 +10,6 @@ import com.dlink.service.JobHistoryService; import com.dlink.utils.JSONUtil; import com.fasterxml.jackson.databind.JsonNode; -import org.springframework.stereotype.Service; /** * JobHistoryServiceImpl @@ -64,7 +65,7 @@ public JobHistory getJobHistoryInfo(JobHistory jobHistory) { } @Override - public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) { + public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave) { JobHistory jobHistory = new JobHistory(); jobHistory.setId(id); try { @@ -78,10 +79,12 @@ public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jo jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints)); jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig)); jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig)); - if (Asserts.isNotNull(getById(id))) { - updateById(jobHistory); - } else { - save(jobHistory); + if (needSave) { + if (Asserts.isNotNull(getById(id))) { + updateById(jobHistory); + } else { + save(jobHistory); + } } } catch (Exception e) { } finally { diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/JobInstanceServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/JobInstanceServiceImpl.java index 145c0e2675..a51c47b1e1 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/JobInstanceServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/JobInstanceServiceImpl.java @@ -1,17 +1,35 @@ package com.dlink.service.impl; +import java.util.List; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.dlink.assertion.Asserts; +import com.dlink.common.result.ProTableResult; import com.dlink.db.service.impl.SuperServiceImpl; +import com.dlink.db.util.ProTableUtil; import com.dlink.explainer.lineage.LineageBuilder; import com.dlink.explainer.lineage.LineageResult; +import com.dlink.job.FlinkJobTaskPool; import com.dlink.mapper.JobInstanceMapper; -import com.dlink.model.*; -import com.dlink.service.*; +import com.dlink.model.History; +import com.dlink.model.JobInfoDetail; +import com.dlink.model.JobInstance; +import com.dlink.model.JobInstanceCount; +import com.dlink.model.JobInstanceStatus; +import com.dlink.model.JobStatus; +import com.dlink.service.ClusterConfigurationService; +import com.dlink.service.ClusterService; +import com.dlink.service.HistoryService; +import com.dlink.service.JobHistoryService; +import com.dlink.service.JobInstanceService; import com.dlink.utils.JSONUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; /** * JobInstanceServiceImpl @@ -99,15 +117,22 @@ public JobInfoDetail getJobInfoDetail(Integer id) { @Override public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) { Asserts.checkNull(jobInstance, "该任务实例不存在"); - JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId()); - jobInfoDetail.setInstance(jobInstance); - jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId())); - jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId())); - History history = historyService.getById(jobInstance.getHistoryId()); - history.setConfig(JSONUtil.parseObject(history.getConfigJson())); - jobInfoDetail.setHistory(history); - if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { - jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); + JobInfoDetail jobInfoDetail; + FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance(); + String key = jobInstance.getId().toString(); + if (pool.exist(key)) { + jobInfoDetail = pool.get(key); + } else { + jobInfoDetail = new JobInfoDetail(jobInstance.getId()); + jobInfoDetail.setInstance(jobInstance); + jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId())); + jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId())); + History history = historyService.getById(jobInstance.getHistoryId()); + history.setConfig(JSONUtil.parseObject(history.getConfigJson())); + jobInfoDetail.setHistory(history); + if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { + jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); + } } return jobInfoDetail; } @@ -123,4 +148,27 @@ public JobInstance getJobInstanceByTaskId(Integer id) { return baseMapper.getJobInstanceByTaskId(id); } + @Override + public ProTableResult listJobInstances(JsonNode para) { + Integer current = para.has("current") ? para.get("current").asInt() : 1; + Integer pageSize = para.has("pageSize") ? para.get("pageSize").asInt() : 10; + QueryWrapper queryWrapper = new QueryWrapper<>(); + ProTableUtil.autoQueryDefalut(para, queryWrapper); + ObjectMapper mapper = new ObjectMapper(); + Map param = mapper.convertValue(para, Map.class); + Page page = new Page<>(current, pageSize); + List list = baseMapper.selectForProTable(page, queryWrapper, param); + FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance(); + for (int i = 0; i < list.size(); i++) { + if (pool.exist(list.get(i).getId().toString())) { + list.get(i).setStatus(pool.get(list.get(i).getId().toString()).getInstance().getStatus()); + list.get(i).setUpdateTime(pool.get(list.get(i).getId().toString()).getInstance().getUpdateTime()); + list.get(i).setFinishTime(pool.get(list.get(i).getId().toString()).getInstance().getFinishTime()); + list.get(i).setError(pool.get(list.get(i).getId().toString()).getInstance().getError()); + list.get(i).setDuration(pool.get(list.get(i).getId().toString()).getInstance().getDuration()); + } + } + return ProTableResult.builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build(); + } + } diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java index a7dcc2b742..da00d0985f 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java @@ -35,6 +35,7 @@ import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.result.SavePointResult; import com.dlink.job.FlinkJobTask; +import com.dlink.job.FlinkJobTaskPool; import com.dlink.job.Job; import com.dlink.job.JobConfig; import com.dlink.job.JobManager; @@ -47,6 +48,7 @@ import com.dlink.model.AlertInstance; import com.dlink.model.Cluster; import com.dlink.model.DataBase; +import com.dlink.model.History; import com.dlink.model.Jar; import com.dlink.model.JobHistory; import com.dlink.model.JobInfoDetail; @@ -63,6 +65,7 @@ import com.dlink.service.ClusterConfigurationService; import com.dlink.service.ClusterService; import com.dlink.service.DataBaseService; +import com.dlink.service.HistoryService; import com.dlink.service.JarService; import com.dlink.service.JobHistoryService; import com.dlink.service.JobInstanceService; @@ -101,6 +104,8 @@ public class TaskServiceImpl extends SuperServiceImpl implemen private AlertGroupService alertGroupService; @Autowired private AlertHistoryService alertHistoryService; + @Autowired + private HistoryService historyService; @Value("${spring.datasource.driver-class-name}") private String driver; @@ -581,30 +586,53 @@ private JobConfig buildJobConfig(Task task) { @Override public JobInstance refreshJobInstance(Integer id, boolean isCoercive) { - JobInstance jobInstance = jobInstanceService.getById(id); - Asserts.checkNull(jobInstance, "该任务实例不存在"); - if (!isCoercive && !inRefreshPlan(jobInstance)) { - return jobInstance; + JobInfoDetail jobInfoDetail; + FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance(); + String key = id.toString(); + if (pool.exist(key)) { + jobInfoDetail = pool.get(key); + } else { + jobInfoDetail = new JobInfoDetail(id); + JobInstance jobInstance = jobInstanceService.getById(id); + Asserts.checkNull(jobInstance, "该任务实例不存在"); + jobInfoDetail.setInstance(jobInstance); + Cluster cluster = clusterService.getById(jobInstance.getClusterId()); + jobInfoDetail.setCluster(cluster); + History history = historyService.getById(jobInstance.getHistoryId()); + history.setConfig(JSONUtil.parseObject(history.getConfigJson())); + if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { + jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); + } + jobInfoDetail.setHistory(history); + pool.push(key, jobInfoDetail); } - String status = jobInstance.getStatus(); - Cluster cluster = clusterService.getById(jobInstance.getClusterId()); - JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, cluster.getJobManagerHost(), jobInstance.getJid()); + if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) { + return jobInfoDetail.getInstance(); + } + JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave()); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); - if (Asserts.isNull(jobHistory.getJob()) || jobHistory.getJob().has(FlinkRestResultConstant.ERRORS)) { - jobInstance.setStatus(JobStatus.UNKNOWN.getValue()); + jobInfoDetail.setJobHistory(jobHistory); + String status = jobInfoDetail.getInstance().getStatus(); + boolean jobStatusChanged = false; + if (Asserts.isNull(jobInfoDetail.getJobHistory().getJob()) || jobInfoDetail.getJobHistory().getJob().has(FlinkRestResultConstant.ERRORS)) { + jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue()); } else { - jobInstance.setDuration(jobHistory.getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000); - jobInstance.setStatus(jobHistory.getJob().get(FlinkRestResultConstant.JOB_STATE).asText()); + jobInfoDetail.getInstance().setDuration(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000); + jobInfoDetail.getInstance().setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText()); } - if (JobStatus.isDone(jobInstance.getStatus()) && !status.equals(jobInstance.getStatus())) { - jobInstance.setFinishTime(LocalDateTime.now()); - handleJobDone(jobInstance); + if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && !status.equals(jobInfoDetail.getInstance().getStatus())) { + jobStatusChanged = true; + jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now()); + handleJobDone(jobInfoDetail.getInstance()); } if (isCoercive) { - DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId())); + DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInfoDetail.getInstance().getId())); + } + if (jobStatusChanged || jobInfoDetail.isNeedSave()) { + jobInstanceService.updateById(jobInfoDetail.getInstance()); } - jobInstanceService.updateById(jobInstance); - return jobInstance; + pool.refresh(jobInfoDetail); + return jobInfoDetail.getInstance(); } private boolean inRefreshPlan(JobInstance jobInstance) { diff --git a/dlink-common/src/main/java/com/dlink/pool/AbstractPool.java b/dlink-common/src/main/java/com/dlink/pool/AbstractPool.java new file mode 100644 index 0000000000..fe5144c411 --- /dev/null +++ b/dlink-common/src/main/java/com/dlink/pool/AbstractPool.java @@ -0,0 +1,38 @@ +package com.dlink.pool; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * AbstractPool + * + * @author wenmo + * @since 2022/5/28 19:40 + */ +public abstract class AbstractPool{ + + public abstract Map getMap(); + + public boolean exist(String key) { + if (getMap().containsKey(key)) { + return true; + } + return false; + } + + public int push(String key, T entity) { + getMap().put(key, entity); + return getMap().size(); + } + + public int remove(String key) { + getMap().remove(key); + return getMap().size(); + } + + public T get(String key) { + return getMap().get(key); + } + + public abstract void refresh(T entity); +}