Skip to content

Commit

Permalink
[Fix](Job)Fixed job scheduling missing certain time window schedules (a…
Browse files Browse the repository at this point in the history
…pache#28659)

Since scheduling itself consumes a certain amount of time, the start time of the time window should not be the current time, but the end time of the last schedule.
  • Loading branch information
CalvinKirs authored Dec 20, 2023
1 parent fb3b0af commit 9aa878e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ private List<Long> getExecutionDelaySeconds(long windowStartTimeMs, long windowE

// Calculate the trigger time list
for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) {
if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs())) {
if (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs()) {
timerDefinition.setLatestSchedulerTimeMs(triggerTime);
timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void scheduleOneJob(T job) throws JobException {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
//if it's timer job and trigger last window already start, we will scheduler it immediately
cycleTimerJobScheduler(job);
cycleTimerJobScheduler(job, System.currentTimeMillis());
}

@Override
Expand All @@ -139,9 +139,9 @@ public void close() throws IOException {
}


private void cycleTimerJobScheduler(T job) {
private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
System.currentTimeMillis(), latestBatchSchedulerTimerTaskTimeMs);
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isNotEmpty(delaySeconds)) {
delaySeconds.forEach(delaySecond -> {
TimerJobSchedulerTask<T> timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job);
Expand Down Expand Up @@ -170,6 +170,8 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) {
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeTimerJobIdsWithinLastTenMinutesWindow() {

long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
}
Expand All @@ -186,7 +188,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
continue;
}
cycleTimerJobScheduler(job);
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testGetTriggerDelayTimesRecurring() {
Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray());
delayTimes = configuration.getTriggerDelayTimes(
1001000L, 0L, 1000000L);
Assertions.assertEquals(0, delayTimes.size());
Assertions.assertEquals(1, delayTimes.size());
}

}

0 comments on commit 9aa878e

Please sign in to comment.