Skip to content

Commit

Permalink
[Feature-545][admin] Add Task Pool to solve frequent database writes
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed May 29, 2022
1 parent 5c696e9 commit ffa17ad
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package com.dlink.controller;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Jar;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.TaskService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* JobInstanceController
Expand All @@ -35,7 +42,7 @@ public class JobInstanceController {
*/
@PostMapping
public ProTableResult<JobInstance> listJobInstances(@RequestBody JsonNode para) {
return jobInstanceService.selectForProTable(para);
return jobInstanceService.listJobInstances(para);
}

/**
Expand Down
17 changes: 10 additions & 7 deletions dlink-admin/src/main/java/com/dlink/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.dlink.job;

import java.time.Duration;
import java.time.LocalDateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;

import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils;
import com.dlink.daemon.constant.FlinkTaskConstant;
Expand All @@ -9,12 +16,6 @@
import com.dlink.model.JobInstance;
import com.dlink.model.JobStatus;
import com.dlink.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;

import java.time.Duration;
import java.time.LocalDateTime;

@DependsOn("springContextUtils")
public class FlinkJobTask implements DaemonTask {
Expand Down Expand Up @@ -54,8 +55,10 @@ public void dealTask() {
preDealTime = System.currentTimeMillis();
JobInstance jobInstance = taskService.refreshJobInstance(config.getId(), false);
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
DefaultThreadPool.getInstance().execute(this);
} else {
FlinkJobTaskPool.getInstance().remove(config.getId().toString());
}
}
}
33 changes: 33 additions & 0 deletions dlink-admin/src/main/java/com/dlink/job/FlinkJobTaskPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.dlink.job;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.dlink.model.JobInfoDetail;
import com.dlink.pool.AbstractPool;

/**
* FlinkJobTaskPool
*
* @author wenmo
* @since 2022/5/28 16:39
*/
public class FlinkJobTaskPool extends AbstractPool<JobInfoDetail> {

private static volatile Map<String, JobInfoDetail> flinkJobTaskEntityMap = new ConcurrentHashMap<>();

private static FlinkJobTaskPool instance = new FlinkJobTaskPool();

public static FlinkJobTaskPool getInstance(){
return instance;
}

@Override
public Map<String, JobInfoDetail> getMap() {
return flinkJobTaskEntityMap;
}

public void refresh(JobInfoDetail entity) {
entity.refresh();
}
}
13 changes: 13 additions & 0 deletions dlink-admin/src/main/java/com/dlink/model/JobInfoDetail.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ public class JobInfoDetail {
private ClusterConfiguration clusterConfiguration;
private History history;
private JobHistory jobHistory;
private Integer refreshCount;

public JobInfoDetail(Integer id) {
this.id = id;
this.refreshCount = 0;
}

public Integer getId() {
Expand Down Expand Up @@ -66,4 +68,15 @@ public JobHistory getJobHistory() {
public void setJobHistory(JobHistory jobHistory) {
this.jobHistory = jobHistory;
}

public void refresh() {
refreshCount = refreshCount + 1;
if (isNeedSave()) {
refreshCount = 0;
}
}

public boolean isNeedSave() {
return refreshCount % 60 == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ public interface JobHistoryService extends ISuperService<JobHistory> {

JobHistory getJobHistoryInfo(JobHistory jobHistory);

JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId);
JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.dlink.service;

import java.util.List;

import com.dlink.common.result.ProTableResult;
import com.dlink.db.service.ISuperService;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceStatus;

import java.util.List;
import com.fasterxml.jackson.databind.JsonNode;

/**
* JobInstanceService
Expand All @@ -27,4 +29,6 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
LineageResult getLineage(Integer id);

JobInstance getJobInstanceByTaskId(Integer id);

ProTableResult<JobInstance> listJobInstances(JsonNode para);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.dlink.service.impl;

import org.springframework.stereotype.Service;

import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
Expand All @@ -8,7 +10,6 @@
import com.dlink.service.JobHistoryService;
import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.stereotype.Service;

/**
* JobHistoryServiceImpl
Expand Down Expand Up @@ -64,7 +65,7 @@ public JobHistory getJobHistoryInfo(JobHistory jobHistory) {
}

@Override
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId) {
public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jobId, boolean needSave) {
JobHistory jobHistory = new JobHistory();
jobHistory.setId(id);
try {
Expand All @@ -78,10 +79,12 @@ public JobHistory refreshJobHistory(Integer id, String jobManagerHost, String jo
jobHistory.setCheckpointsJson(JSONUtil.toJsonString(checkPoints));
jobHistory.setCheckpointsConfigJson(JSONUtil.toJsonString(checkPointsConfig));
jobHistory.setConfigJson(JSONUtil.toJsonString(jobsConfig));
if (Asserts.isNotNull(getById(id))) {
updateById(jobHistory);
} else {
save(jobHistory);
if (needSave) {
if (Asserts.isNotNull(getById(id))) {
updateById(jobHistory);
} else {
save(jobHistory);
}
}
} catch (Exception e) {
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
package com.dlink.service.impl;

import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.ProTableResult;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.db.util.ProTableUtil;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.job.FlinkJobTaskPool;
import com.dlink.mapper.JobInstanceMapper;
import com.dlink.model.*;
import com.dlink.service.*;
import com.dlink.model.History;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.JobInstanceCount;
import com.dlink.model.JobInstanceStatus;
import com.dlink.model.JobStatus;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService;
import com.dlink.service.JobHistoryService;
import com.dlink.service.JobInstanceService;
import com.dlink.utils.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* JobInstanceServiceImpl
Expand Down Expand Up @@ -99,15 +117,22 @@ public JobInfoDetail getJobInfoDetail(Integer id) {
@Override
public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
Asserts.checkNull(jobInstance, "该任务实例不存在");
JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId()));
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
JobInfoDetail jobInfoDetail;
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
String key = jobInstance.getId().toString();
if (pool.exist(key)) {
jobInfoDetail = pool.get(key);
} else {
jobInfoDetail = new JobInfoDetail(jobInstance.getId());
jobInfoDetail.setInstance(jobInstance);
jobInfoDetail.setCluster(clusterService.getById(jobInstance.getClusterId()));
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(jobInstance.getId()));
History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
jobInfoDetail.setHistory(history);
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
}
return jobInfoDetail;
}
Expand All @@ -123,4 +148,27 @@ public JobInstance getJobInstanceByTaskId(Integer id) {
return baseMapper.getJobInstanceByTaskId(id);
}

@Override
public ProTableResult<JobInstance> listJobInstances(JsonNode para) {
Integer current = para.has("current") ? para.get("current").asInt() : 1;
Integer pageSize = para.has("pageSize") ? para.get("pageSize").asInt() : 10;
QueryWrapper<JobInstance> queryWrapper = new QueryWrapper<>();
ProTableUtil.autoQueryDefalut(para, queryWrapper);
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> param = mapper.convertValue(para, Map.class);
Page<JobInstance> page = new Page<>(current, pageSize);
List<JobInstance> list = baseMapper.selectForProTable(page, queryWrapper, param);
FlinkJobTaskPool pool = FlinkJobTaskPool.getInstance();
for (int i = 0; i < list.size(); i++) {
if (pool.exist(list.get(i).getId().toString())) {
list.get(i).setStatus(pool.get(list.get(i).getId().toString()).getInstance().getStatus());
list.get(i).setUpdateTime(pool.get(list.get(i).getId().toString()).getInstance().getUpdateTime());
list.get(i).setFinishTime(pool.get(list.get(i).getId().toString()).getInstance().getFinishTime());
list.get(i).setError(pool.get(list.get(i).getId().toString()).getInstance().getError());
list.get(i).setDuration(pool.get(list.get(i).getId().toString()).getInstance().getDuration());
}
}
return ProTableResult.<JobInstance>builder().success(true).data(list).total(page.getTotal()).current(current).pageSize(pageSize).build();
}

}
Loading

0 comments on commit ffa17ad

Please sign in to comment.