From fad74461152920028c1b5155591a405ffd2361df Mon Sep 17 00:00:00 2001 From: chembohuang Date: Thu, 5 Jan 2017 18:14:36 +0800 Subject: [PATCH] #54 reduce zookeeper load by optimizing the read/write during the execution of the job --- .../saturn/job/basic/AbstractElasticJob.java | 10 ++--- .../vip/saturn/job/basic/JobScheduler.java | 30 ++++++------- .../control/ControlListenerManager.java | 6 +-- .../job/internal/control/ExecutionInfo.java | 12 ++++- ...ControlService.java => ReportService.java} | 18 +++++--- .../job/internal/execution/ExecutionNode.java | 2 +- .../internal/execution/ExecutionService.java | 45 ++++++++++--------- 7 files changed, 68 insertions(+), 55 deletions(-) rename saturn-core/src/main/java/com/vip/saturn/job/internal/control/{ControlService.java => ReportService.java} (82%) diff --git a/saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractElasticJob.java b/saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractElasticJob.java index 7eae234c4..31a5a64ff 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractElasticJob.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/basic/AbstractElasticJob.java @@ -27,7 +27,7 @@ import com.vip.saturn.job.executor.SaturnExecutorService; import com.vip.saturn.job.internal.config.ConfigurationService; -import com.vip.saturn.job.internal.control.ControlService; +import com.vip.saturn.job.internal.control.ReportService; import com.vip.saturn.job.internal.execution.ExecutionContextService; import com.vip.saturn.job.internal.execution.ExecutionNode; import com.vip.saturn.job.internal.execution.ExecutionService; @@ -84,7 +84,7 @@ public abstract class AbstractElasticJob implements Stopable { protected SaturnExecutorService saturnExecutorService; - protected ControlService controlService; + protected ReportService reportService; /** @@ -193,7 +193,7 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi serverService.updateServerStatus(ServerStatus.READY);// server状态只需更新一次 updateServerStatus = true; } - executionService.registerJobCompleted(shardingContext, item); + executionService.registerJobCompletedByItem(shardingContext, item); } if (isFailoverSupported() && configService.isFailover()) { failoverService.updateFailoverComplete(item); @@ -267,8 +267,8 @@ protected void setServerService(ServerService serverService) { this.serverService = serverService; } - protected void setControlService(ControlService controlService) { - this.controlService = controlService; + protected void setReportService(ReportService reportService) { + this.reportService = reportService; } /** diff --git a/saturn-core/src/main/java/com/vip/saturn/job/basic/JobScheduler.java b/saturn-core/src/main/java/com/vip/saturn/job/basic/JobScheduler.java index 15e7da40b..a689d1174 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/basic/JobScheduler.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/basic/JobScheduler.java @@ -28,11 +28,10 @@ import com.vip.saturn.job.internal.analyse.AnalyseService; import com.vip.saturn.job.internal.config.ConfigurationService; import com.vip.saturn.job.internal.config.JobConfiguration; -import com.vip.saturn.job.internal.control.ControlService; +import com.vip.saturn.job.internal.control.ReportService; import com.vip.saturn.job.internal.election.LeaderElectionService; import com.vip.saturn.job.internal.execution.ExecutionContextService; import com.vip.saturn.job.internal.execution.ExecutionService; -import com.vip.saturn.job.internal.execution.NextFireTimePausePeriodEffected; import com.vip.saturn.job.internal.failover.FailoverService; import com.vip.saturn.job.internal.listener.ListenerManager; import com.vip.saturn.job.internal.offset.OffsetService; @@ -71,7 +70,7 @@ public class JobScheduler { private final ServerService serverService; - private final ControlService controlService; + private final ReportService reportService; private final ShardingService shardingService; @@ -116,7 +115,7 @@ public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, analyseService = new AnalyseService(this); limitMaxJobsService = new LimitMaxJobsService(this); listenerManager = new ListenerManager(this); - controlService = new ControlService(this); + reportService = new ReportService(this); // see EnabledPathListener.java, only these values are supposed to be watched. previousConf.setCron(jobConfiguration.getCron()); @@ -183,7 +182,7 @@ private void createJob() throws SchedulerException { job.setOffsetService(offsetService); job.setServerService(serverService); job.setExecutorName(executorName); - job.setControlService(controlService); + job.setReportService(reportService); job.setJobName(jobName); job.setNamespace(coordinatorRegistryCenter.getNamespace()); job.setSaturnExecutorService(saturnExecutorService); @@ -195,31 +194,28 @@ private void createJob() throws SchedulerException { * * @return 下次作业触发时间 */ - public NextFireTimePausePeriodEffected getNextFireTimePausePeriodEffected() { - NextFireTimePausePeriodEffected result = new NextFireTimePausePeriodEffected(); + public Date getNextFireTimePausePeriodEffected() { SaturnScheduler saturnScheduler = job.getScheduler(); if(saturnScheduler == null){ - return result; + return null; } Trigger trigger = saturnScheduler.getTrigger(); if (trigger == null) { - return result; + return null; } + ((OperableTrigger) trigger).updateAfterMisfire(null); Date nextFireTime = trigger.getNextFireTime(); while (nextFireTime != null && configService.isInPausePeriod(nextFireTime)) { nextFireTime = trigger.getFireTimeAfter(nextFireTime); } if (null == nextFireTime) { - return result; - } - if (null == result.getNextFireTime() || nextFireTime.getTime() < result.getNextFireTime().getTime()) { - result.setNextFireTime(nextFireTime); + return null; } - return result; + return nextFireTime; } - + /** * 停止作业. * @param stopJob 是否强制停止作业 @@ -378,8 +374,8 @@ public ConfigurationService getConfigService() { return configService; } - public ControlService getControlService() { - return controlService; + public ReportService getReportService() { + return reportService; } public LeaderElectionService getLeaderElectionService() { diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlListenerManager.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlListenerManager.java index 1b8fbfbea..f6f5f5f02 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlListenerManager.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlListenerManager.java @@ -33,11 +33,11 @@ public class ControlListenerManager extends AbstractListenerManager { private boolean isShutdown = false; - private ControlService controlService; + private ReportService reportService; public ControlListenerManager(JobScheduler jobScheduler) { super(jobScheduler); - controlService = jobScheduler.getControlService(); + reportService = jobScheduler.getReportService(); } @Override @@ -57,7 +57,7 @@ protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String if(isShutdown) return; if (ControlNode.isReportPath(jobName, path) && (Type.NODE_UPDATED == event.getType() || Type.NODE_ADDED == event.getType())) { log.info("[{}] msg={} received report event from console, start to flush data to zk.", jobName, jobName); - controlService.reportData2Zk(); + reportService.reportData2Zk(); } } } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ExecutionInfo.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ExecutionInfo.java index 4fcb5adfa..d0f346be8 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ExecutionInfo.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ExecutionInfo.java @@ -31,10 +31,20 @@ public final class ExecutionInfo implements Serializable { private Long lastCompleteTime; + private Long nextFireTime; + /** 作业分片运行日志 */ private String jobLog; - public ExecutionInfo(int item, Long lastBeginTime) { + public Long getNextFireTime() { + return nextFireTime; + } + + public void setNextFireTime(Long nextFireTime) { + this.nextFireTime = nextFireTime; + } + + public ExecutionInfo(int item, Long lastBeginTime) { this.item = item; this.lastBeginTime = lastBeginTime; } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlService.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ReportService.java similarity index 82% rename from saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlService.java rename to saturn-core/src/main/java/com/vip/saturn/job/internal/control/ReportService.java index 2b8826a90..52c299d27 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ControlService.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/control/ReportService.java @@ -17,10 +17,10 @@ package com.vip.saturn.job.internal.control; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +33,13 @@ * @author chembo.huang * */ -public class ControlService extends AbstractSaturnService { +public class ReportService extends AbstractSaturnService { - static Logger log = LoggerFactory.getLogger(ControlService.class); + static Logger log = LoggerFactory.getLogger(ReportService.class); - public Map infoMap = new ConcurrentHashMap<>(); + public Map infoMap = new HashMap<>(); - public ControlService(JobScheduler jobScheduler) { + public ReportService(JobScheduler jobScheduler) { super(jobScheduler); } @@ -56,6 +56,9 @@ public void reportData2Zk() { if (info.getLastCompleteTime() != null) { jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getLastCompleteTimeNode(item), info.getLastCompleteTime()); } + if (info.getNextFireTime() != null) { + jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), info.getNextFireTime()); + } jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getJobLog(item), (info.getJobLog() == null?"":info.getJobLog())); jobScheduler.getJobNodeStorage().replaceJobNode(ExecutionNode.getJobMsg(item), (info.getJobMsg() == null?"":info.getJobMsg())); log.info("done flushing {} to zk.", info); @@ -70,9 +73,12 @@ public void clearInfoMap() { } } - public void initInfoOnBegin(int item) { + public void initInfoOnBegin(int item, Long nextFireTime) { synchronized (infoMap) { ExecutionInfo info = new ExecutionInfo(item, System.currentTimeMillis()); + if (nextFireTime != null) { + info.setNextFireTime(nextFireTime); + } infoMap.put(item, info); } } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionNode.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionNode.java index fa8a8cd75..68896ad78 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionNode.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionNode.java @@ -104,7 +104,7 @@ public static String getLastBeginTimeNode(final int item) { return String.format(LAST_BEGIN_TIME, item); } - static String getNextFireTimeNode(final int item) { + public static String getNextFireTimeNode(final int item) { return String.format(NEXT_FIRE_TIME, item); } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionService.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionService.java index ca9703252..cfc2603da 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionService.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/execution/ExecutionService.java @@ -17,6 +17,7 @@ package com.vip.saturn.job.internal.execution; +import java.util.Date; import java.util.List; import org.slf4j.Logger; @@ -31,8 +32,8 @@ import com.vip.saturn.job.basic.JobScheduler; import com.vip.saturn.job.basic.SaturnExecutionContext; import com.vip.saturn.job.internal.config.ConfigurationService; -import com.vip.saturn.job.internal.control.ControlService; import com.vip.saturn.job.internal.control.ExecutionInfo; +import com.vip.saturn.job.internal.control.ReportService; import com.vip.saturn.job.internal.failover.FailoverNode; import com.vip.saturn.job.internal.server.ServerService; import com.vip.saturn.job.internal.server.ServerStatus; @@ -51,7 +52,7 @@ public class ExecutionService extends AbstractSaturnService { private ServerService serverService; - private ControlService controlService; + private ReportService reportService; public ExecutionService(final JobScheduler jobScheduler) { super(jobScheduler); @@ -61,7 +62,7 @@ public ExecutionService(final JobScheduler jobScheduler) { public void start(){ configService = jobScheduler.getConfigService(); serverService = jobScheduler.getServerService(); - controlService = jobScheduler.getControlService(); + reportService = jobScheduler.getReportService(); } /** @@ -72,19 +73,19 @@ public void start(){ public void updateNextFireTime(final List shardingItems) { if (!shardingItems.isEmpty()) { for (int item : shardingItems) { - updateNextFireTime(item); + updateNextFireTimeByItem(item); } } } - private void updateNextFireTime(int item) { + private void updateNextFireTimeByItem(int item) { if (null == jobScheduler) { return; } - NextFireTimePausePeriodEffected nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected(); - if (null != nextFireTimePausePeriodEffected && null != nextFireTimePausePeriodEffected.getNextFireTime()) { + Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected(); + if (null != nextFireTimePausePeriodEffected) { //String pausePeriodEffectedNode = ExecutionNode.getPausePeriodEffectedNode(item); - getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), nextFireTimePausePeriodEffected.getNextFireTime().getTime()); + getJobNodeStorage().replaceJobNode(ExecutionNode.getNextFireTimeNode(item), nextFireTimePausePeriodEffected.getTime()); //getJobNodeStorage().replaceJobNode(pausePeriodEffectedNode, nextFireTimePausePeriodEffected.isPausePeriodEffected()); } } @@ -98,10 +99,12 @@ public void registerJobBegin(final JobExecutionMultipleShardingContext jobExecut List shardingItems = jobExecutionShardingContext.getShardingItems(); if (!shardingItems.isEmpty()) { serverService.updateServerStatus(ServerStatus.RUNNING); - controlService.clearInfoMap(); + reportService.clearInfoMap(); + Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected(); + Long nextFireTime = nextFireTimePausePeriodEffected == null?null:nextFireTimePausePeriodEffected.getTime(); for (int item : shardingItems) { registerJobBeginByItem(jobExecutionShardingContext, item); - controlService.initInfoOnBegin(item); + reportService.initInfoOnBegin(item, nextFireTime); } } } @@ -124,23 +127,16 @@ public void registerJobBeginByItem(final JobExecutionMultipleShardingContext job //getJobNodeStorage().replaceJobNode(ExecutionNode.getLastBeginTimeNode(item), System.currentTimeMillis()); - updateNextFireTime(item); + //updateNextFireTimeAndPausePeriodEffected(item); } /** * 注册作业完成信息. * - * @param jobExecutionShardingContext 作业运行时分片上下文 - * @param item 分片项 */ - public void registerJobCompleted(final JobExecutionMultipleShardingContext jobExecutionShardingContext,Integer item) { - registerJobCompletedByItem(jobExecutionShardingContext, item); - } - public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext jobExecutionShardingContext, int item) { - ExecutionInfo info = controlService.getInfoByItem(item); - // old data has been flushed to zk. - if (info == null) { + ExecutionInfo info = reportService.getInfoByItem(item); + if (info == null) { // old data has been flushed to zk. info = new ExecutionInfo(item); } if (jobExecutionShardingContext instanceof SaturnExecutionContext) { @@ -197,7 +193,7 @@ public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext } } } - updateNextFireTime(item); + //updateNextFireTimeAndPausePeriodEffected(item); if(jobConfiguration.isEnabledReport() == null){ if("JAVA_JOB".equals(jobConfiguration.getJobType()) || "SHELL_JOB".equals(jobConfiguration.getJobType())){ getJobNodeStorage().createJobNodeIfNeeded(ExecutionNode.getCompletedNode(item)); @@ -206,8 +202,13 @@ public void registerJobCompletedByItem(final JobExecutionMultipleShardingContext getJobNodeStorage().createJobNodeIfNeeded(ExecutionNode.getCompletedNode(item)); } getJobNodeStorage().removeJobNodeIfExisted(ExecutionNode.getRunningNode(item)); + + Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected(); + if (null != nextFireTimePausePeriodEffected) { + info.setNextFireTime(nextFireTimePausePeriodEffected.getTime()); + } info.setLastCompleteTime(System.currentTimeMillis()); - controlService.fillInfoOnAfter(info); + reportService.fillInfoOnAfter(info); //getJobNodeStorage().replaceJobNode(ExecutionNode.getLastCompleteTimeNode(item), System.currentTimeMillis()); }