Skip to content

Commit

Permalink
#54 reduce zookeeper load by optimizing the read/write during the exe…
Browse files Browse the repository at this point in the history
…cution of the job
  • Loading branch information
chembohuang committed Jan 5, 2017
1 parent eb86230 commit fad7446
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import com.vip.saturn.job.executor.SaturnExecutorService;
import com.vip.saturn.job.internal.config.ConfigurationService;
import com.vip.saturn.job.internal.control.ControlService;
import com.vip.saturn.job.internal.control.ReportService;
import com.vip.saturn.job.internal.execution.ExecutionContextService;
import com.vip.saturn.job.internal.execution.ExecutionNode;
import com.vip.saturn.job.internal.execution.ExecutionService;
Expand Down Expand Up @@ -84,7 +84,7 @@ public abstract class AbstractElasticJob implements Stopable {

protected SaturnExecutorService saturnExecutorService;

protected ControlService controlService;
protected ReportService reportService;


/**
Expand Down Expand Up @@ -193,7 +193,7 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi
serverService.updateServerStatus(ServerStatus.READY);// server状态只需更新一次
updateServerStatus = true;
}
executionService.registerJobCompleted(shardingContext, item);
executionService.registerJobCompletedByItem(shardingContext, item);
}
if (isFailoverSupported() && configService.isFailover()) {
failoverService.updateFailoverComplete(item);
Expand Down Expand Up @@ -267,8 +267,8 @@ protected void setServerService(ServerService serverService) {
this.serverService = serverService;
}

protected void setControlService(ControlService controlService) {
this.controlService = controlService;
protected void setReportService(ReportService reportService) {
this.reportService = reportService;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import com.vip.saturn.job.internal.analyse.AnalyseService;
import com.vip.saturn.job.internal.config.ConfigurationService;
import com.vip.saturn.job.internal.config.JobConfiguration;
import com.vip.saturn.job.internal.control.ControlService;
import com.vip.saturn.job.internal.control.ReportService;
import com.vip.saturn.job.internal.election.LeaderElectionService;
import com.vip.saturn.job.internal.execution.ExecutionContextService;
import com.vip.saturn.job.internal.execution.ExecutionService;
import com.vip.saturn.job.internal.execution.NextFireTimePausePeriodEffected;
import com.vip.saturn.job.internal.failover.FailoverService;
import com.vip.saturn.job.internal.listener.ListenerManager;
import com.vip.saturn.job.internal.offset.OffsetService;
Expand Down Expand Up @@ -71,7 +70,7 @@ public class JobScheduler {

private final ServerService serverService;

private final ControlService controlService;
private final ReportService reportService;

private final ShardingService shardingService;

Expand Down Expand Up @@ -116,7 +115,7 @@ public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter,
analyseService = new AnalyseService(this);
limitMaxJobsService = new LimitMaxJobsService(this);
listenerManager = new ListenerManager(this);
controlService = new ControlService(this);
reportService = new ReportService(this);

// see EnabledPathListener.java, only these values are supposed to be watched.
previousConf.setCron(jobConfiguration.getCron());
Expand Down Expand Up @@ -183,7 +182,7 @@ private void createJob() throws SchedulerException {
job.setOffsetService(offsetService);
job.setServerService(serverService);
job.setExecutorName(executorName);
job.setControlService(controlService);
job.setReportService(reportService);
job.setJobName(jobName);
job.setNamespace(coordinatorRegistryCenter.getNamespace());
job.setSaturnExecutorService(saturnExecutorService);
Expand All @@ -195,31 +194,28 @@ private void createJob() throws SchedulerException {
*
* @return 下次作业触发时间
*/
public NextFireTimePausePeriodEffected getNextFireTimePausePeriodEffected() {
NextFireTimePausePeriodEffected result = new NextFireTimePausePeriodEffected();
public Date getNextFireTimePausePeriodEffected() {
SaturnScheduler saturnScheduler = job.getScheduler();
if(saturnScheduler == null){
return result;
return null;
}
Trigger trigger = saturnScheduler.getTrigger();

if (trigger == null) {
return result;
return null;
}

((OperableTrigger) trigger).updateAfterMisfire(null);
Date nextFireTime = trigger.getNextFireTime();
while (nextFireTime != null && configService.isInPausePeriod(nextFireTime)) {
nextFireTime = trigger.getFireTimeAfter(nextFireTime);
}
if (null == nextFireTime) {
return result;
}
if (null == result.getNextFireTime() || nextFireTime.getTime() < result.getNextFireTime().getTime()) {
result.setNextFireTime(nextFireTime);
return null;
}
return result;
return nextFireTime;
}

/**
* 停止作业.
* @param stopJob 是否强制停止作业
Expand Down Expand Up @@ -378,8 +374,8 @@ public ConfigurationService getConfigService() {
return configService;
}

public ControlService getControlService() {
return controlService;
public ReportService getReportService() {
return reportService;
}

public LeaderElectionService getLeaderElectionService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public class ControlListenerManager extends AbstractListenerManager {

private boolean isShutdown = false;

private ControlService controlService;
private ReportService reportService;

public ControlListenerManager(JobScheduler jobScheduler) {
super(jobScheduler);
controlService = jobScheduler.getControlService();
reportService = jobScheduler.getReportService();
}

@Override
Expand All @@ -57,7 +57,7 @@ protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String
if(isShutdown) return;
if (ControlNode.isReportPath(jobName, path) && (Type.NODE_UPDATED == event.getType() || Type.NODE_ADDED == event.getType())) {
log.info("[{}] msg={} received report event from console, start to flush data to zk.", jobName, jobName);
controlService.reportData2Zk();
reportService.reportData2Zk();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,20 @@ public final class ExecutionInfo implements Serializable {

private Long lastCompleteTime;

private Long nextFireTime;

/** 作业分片运行日志 */
private String jobLog;

public ExecutionInfo(int item, Long lastBeginTime) {
public Long getNextFireTime() {
return nextFireTime;
}

public void setNextFireTime(Long nextFireTime) {
this.nextFireTime = nextFireTime;
}

public ExecutionInfo(int item, Long lastBeginTime) {
this.item = item;
this.lastBeginTime = lastBeginTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package com.vip.saturn.job.internal.control;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,13 +33,13 @@
* @author chembo.huang
*
*/
public class ControlService extends AbstractSaturnService {
public class ReportService extends AbstractSaturnService {

static Logger log = LoggerFactory.getLogger(ControlService.class);
static Logger log = LoggerFactory.getLogger(ReportService.class);

public Map<Integer,ExecutionInfo> infoMap = new ConcurrentHashMap<>();
public Map<Integer, ExecutionInfo> infoMap = new HashMap<>();

public ControlService(JobScheduler jobScheduler) {
public ReportService(JobScheduler jobScheduler) {
super(jobScheduler);
}

Expand All @@ -56,6 +56,9 @@ public void reportData2Zk() {
if (info.getLastCompleteTime() != null) {
jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getLastCompleteTimeNode(item), info.getLastCompleteTime());
}
if (info.getNextFireTime() != null) {
jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), info.getNextFireTime());
}
jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getJobLog(item), (info.getJobLog() == null?"":info.getJobLog()));
jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getJobMsg(item), (info.getJobMsg() == null?"":info.getJobMsg()));
log.info("done flushing {} to zk.", info);
Expand All @@ -70,9 +73,12 @@ public void clearInfoMap() {
}
}

public void initInfoOnBegin(int item) {
public void initInfoOnBegin(int item, Long nextFireTime) {
synchronized (infoMap) {
ExecutionInfo info = new ExecutionInfo(item, System.currentTimeMillis());
if (nextFireTime != null) {
info.setNextFireTime(nextFireTime);
}
infoMap.put(item, info);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static String getLastBeginTimeNode(final int item) {
return String.format(LAST_BEGIN_TIME, item);
}

static String getNextFireTimeNode(final int item) {
public static String getNextFireTimeNode(final int item) {
return String.format(NEXT_FIRE_TIME, item);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.vip.saturn.job.internal.execution;

import java.util.Date;
import java.util.List;

import org.slf4j.Logger;
Expand All @@ -31,8 +32,8 @@
import com.vip.saturn.job.basic.JobScheduler;
import com.vip.saturn.job.basic.SaturnExecutionContext;
import com.vip.saturn.job.internal.config.ConfigurationService;
import com.vip.saturn.job.internal.control.ControlService;
import com.vip.saturn.job.internal.control.ExecutionInfo;
import com.vip.saturn.job.internal.control.ReportService;
import com.vip.saturn.job.internal.failover.FailoverNode;
import com.vip.saturn.job.internal.server.ServerService;
import com.vip.saturn.job.internal.server.ServerStatus;
Expand All @@ -51,7 +52,7 @@ public class ExecutionService extends AbstractSaturnService {

private ServerService serverService;

private ControlService controlService;
private ReportService reportService;

public ExecutionService(final JobScheduler jobScheduler) {
super(jobScheduler);
Expand All @@ -61,7 +62,7 @@ public ExecutionService(final JobScheduler jobScheduler) {
public void start(){
configService = jobScheduler.getConfigService();
serverService = jobScheduler.getServerService();
controlService = jobScheduler.getControlService();
reportService = jobScheduler.getReportService();
}

/**
Expand All @@ -72,19 +73,19 @@ public void start(){
public void updateNextFireTime(final List<Integer> shardingItems) {
if (!shardingItems.isEmpty()) {
for (int item : shardingItems) {
updateNextFireTime(item);
updateNextFireTimeByItem(item);
}
}
}

private void updateNextFireTime(int item) {
private void updateNextFireTimeByItem(int item) {
if (null == jobScheduler) {
return;
}
NextFireTimePausePeriodEffected nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();
if (null != nextFireTimePausePeriodEffected && null != nextFireTimePausePeriodEffected.getNextFireTime()) {
Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();
if (null != nextFireTimePausePeriodEffected) {
//String pausePeriodEffectedNode = ExecutionNode.getPausePeriodEffectedNode(item);
getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), nextFireTimePausePeriodEffected.getNextFireTime().getTime());
getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), nextFireTimePausePeriodEffected.getTime());
//getJobNodeStorage().replaceJobNode(pausePeriodEffectedNode, nextFireTimePausePeriodEffected.isPausePeriodEffected());
}
}
Expand All @@ -98,10 +99,12 @@ public void registerJobBegin(final JobExecutionMultipleShardingContext jobExecut
List<Integer> shardingItems = jobExecutionShardingContext.getShardingItems();
if (!shardingItems.isEmpty()) {
serverService.updateServerStatus(ServerStatus.RUNNING);
controlService.clearInfoMap();
reportService.clearInfoMap();
Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();
Long nextFireTime = nextFireTimePausePeriodEffected == null?null:nextFireTimePausePeriodEffected.getTime();
for (int item : shardingItems) {
registerJobBeginByItem(jobExecutionShardingContext, item);
controlService.initInfoOnBegin(item);
reportService.initInfoOnBegin(item, nextFireTime);
}
}
}
Expand All @@ -124,23 +127,16 @@ public void registerJobBeginByItem(final JobExecutionMultipleShardingContext job

//getJobNodeStorage().replaceJobNode(ExecutionNode.getLastBeginTimeNode(item), System.currentTimeMillis());

updateNextFireTime(item);
//updateNextFireTimeAndPausePeriodEffected(item);
}

/**
* 注册作业完成信息.
*
* @param jobExecutionShardingContext 作业运行时分片上下文
* @param item 分片项
*/
public void registerJobCompleted(final JobExecutionMultipleShardingContext jobExecutionShardingContext,Integer item) {
registerJobCompletedByItem(jobExecutionShardingContext, item);
}

public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext jobExecutionShardingContext, int item) {
ExecutionInfo info = controlService.getInfoByItem(item);
// old data has been flushed to zk.
if (info == null) {
ExecutionInfo info = reportService.getInfoByItem(item);
if (info == null) { // old data has been flushed to zk.
info = new ExecutionInfo(item);
}
if (jobExecutionShardingContext instanceof SaturnExecutionContext) {
Expand Down Expand Up @@ -197,7 +193,7 @@ public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext
}
}
}
updateNextFireTime(item);
//updateNextFireTimeAndPausePeriodEffected(item);
if(jobConfiguration.isEnabledReport() == null){
if("JAVA_JOB".equals(jobConfiguration.getJobType()) || "SHELL_JOB".equals(jobConfiguration.getJobType())){
getJobNodeStorage().createJobNodeIfNeeded(ExecutionNode.getCompletedNode(item));
Expand All @@ -206,8 +202,13 @@ public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext
getJobNodeStorage().createJobNodeIfNeeded(ExecutionNode.getCompletedNode(item));
}
getJobNodeStorage().removeJobNodeIfExisted(ExecutionNode.getRunningNode(item));

Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();
if (null != nextFireTimePausePeriodEffected) {
info.setNextFireTime(nextFireTimePausePeriodEffected.getTime());
}
info.setLastCompleteTime(System.currentTimeMillis());
controlService.fillInfoOnAfter(info);
reportService.fillInfoOnAfter(info);
//getJobNodeStorage().replaceJobNode(ExecutionNode.getLastCompleteTimeNode(item), System.currentTimeMillis());
}

Expand Down

0 comments on commit fad7446

Please sign in to comment.