diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 16b9dd24281eec..0b44073464f7a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -178,8 +178,8 @@ private List getExecutionDelaySeconds(long windowStartTimeMs, long windowE // Calculate the trigger time list for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) { - if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs() - || triggerTime < timerDefinition.getEndTimeMs())) { + if (null == timerDefinition.getEndTimeMs() + || triggerTime < timerDefinition.getEndTimeMs()) { timerDefinition.setLatestSchedulerTimeMs(triggerTime); timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 47e91d97b49a88..08bbbb6dbaba9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -130,7 +130,7 @@ public void scheduleOneJob(T job) throws JobException { schedulerInstantJob(job, TaskType.SCHEDULED, null); } //if it's timer job and trigger last window already start, we will scheduler it immediately - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, System.currentTimeMillis()); } @Override @@ -139,9 +139,9 @@ public void close() throws IOException { } - private void cycleTimerJobScheduler(T job) { + private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { List delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), - System.currentTimeMillis(), latestBatchSchedulerTimerTaskTimeMs); + startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isNotEmpty(delaySeconds)) { delaySeconds.forEach(delaySecond -> { TimerJobSchedulerTask timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job); @@ -170,6 +170,8 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) { * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger */ private void executeTimerJobIdsWithinLastTenMinutesWindow() { + + long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); } @@ -186,7 +188,7 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { continue; } - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, lastTimeWindowMs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 87d1430375ad2c..91678ee5c1d2d1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -64,7 +64,7 @@ public void testGetTriggerDelayTimesRecurring() { Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( 1001000L, 0L, 1000000L); - Assertions.assertEquals(0, delayTimes.size()); + Assertions.assertEquals(1, delayTimes.size()); } }