Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#111433 Watch Next Run Interval Resets On Shard Move or Node Restart #115102

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/115102.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115102
summary: Watch Next Run Interval Resets On Shard Move or Node Restart
area: Watcher
type: bug
issues:
- 111433
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
Expand All @@ -32,6 +34,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -67,7 +70,11 @@ public synchronized void start(Collection<Watch> jobs) {
Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size());
for (Watch job : jobs) {
if (job.trigger() instanceof ScheduleTrigger trigger) {
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
if (trigger.getSchedule() instanceof IntervalSchedule) {
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), calculateLastStartTime(job)));
} else {
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
}
}
}
// why are we calling putAll() here instead of assigning a brand
Expand Down Expand Up @@ -108,10 +115,39 @@ public void add(Watch watch) {
// watcher indexing listener
// this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule
if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) {
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
if (trigger.getSchedule() instanceof IntervalSchedule) {
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), calculateLastStartTime(watch)));
} else {
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
}

}
}

/**
* Attempts to calculate the epoch millis of the last time the watch was checked, If the watch has never been checked, the timestamp of
* the last state change is used. If the watch has never been checked and has never been in an active state, the current time is used.
* @param job the watch to calculate the last start time for
* @return the epoch millis of the last time the watch was checked or now
*/
private long calculateLastStartTime(Watch job) {
var lastChecked = Optional.ofNullable(job)
.map(Watch::status)
.map(WatchStatus::lastChecked)
.map(ZonedDateTime::toInstant)
.map(Instant::toEpochMilli);

return lastChecked.orElseGet(
() -> Optional.ofNullable(job)
.map(Watch::status)
.map(WatchStatus::state)
.map(WatchStatus.State::getTimestamp)
.map(ZonedDateTime::toInstant)
.map(Instant::toEpochMilli)
.orElse(clock.millis())
);
}

@Override
public boolean remove(String jobId) {
logger.debug("Removing watch [{}] from engine (engine is running: {})", jobId, isRunning.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

Expand Down Expand Up @@ -283,6 +285,216 @@ public void testAddOnlyWithNewSchedule() {
assertThat(engine.getSchedules().get("_id"), not(is(activeSchedule)));
}

public void testWatchWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
final var firstLatch = new CountDownLatch(1);
final var secondLatch = new CountDownLatch(1);

var firstExecuted = new AtomicBoolean(false);

Watch watch = new Watch(
"watch",
new ScheduleTrigger(interval("1s")),
new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE,
null,
null,
Collections.emptyList(),
null,
new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);

var watches = Collections.singletonList(watch);

engine.register(events -> {
for (TriggerEvent ignored : events) {
if (firstExecuted.get() == false) {
logger.info("job first fire");
firstExecuted.set(true);
firstLatch.countDown();
} else {
logger.info("job second fire");
secondLatch.countDown();
}
}
});

engine.start(watches);
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}

advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
}

public void testWatchWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
final var firstLatch = new CountDownLatch(1);
final var secondLatch = new CountDownLatch(1);

var firstExecuted = new AtomicBoolean(false);

Watch watch = new Watch(
"watch",
new ScheduleTrigger(interval("1s")),
new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE,
null,
null,
Collections.emptyList(),
null,
new WatchStatus(
-1L,
new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
null,
null,
null,
null,
null
),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);

var watches = Collections.singletonList(watch);

engine.register(events -> {
for (TriggerEvent ignored : events) {
if (firstExecuted.get() == false) {
logger.info("job first fire");
firstExecuted.set(true);
firstLatch.countDown();
} else {
logger.info("job second fire");
secondLatch.countDown();
}
}
});

engine.start(watches);
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}

advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
}

public void testAddWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
final var firstLatch = new CountDownLatch(1);
final var secondLatch = new CountDownLatch(1);

var firstExecuted = new AtomicBoolean(false);

Watch watch = new Watch(
"watch",
new ScheduleTrigger(interval("1s")),
new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE,
null,
null,
Collections.emptyList(),
null,
new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);

engine.register(events -> {
for (TriggerEvent ignored : events) {
if (firstExecuted.get() == false) {
logger.info("job first fire");
firstExecuted.set(true);
firstLatch.countDown();
} else {
logger.info("job second fire");
secondLatch.countDown();
}
}
});

engine.start(Collections.emptyList());
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
engine.add(watch);

advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}

advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
}

public void testAddWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
final var firstLatch = new CountDownLatch(1);
final var secondLatch = new CountDownLatch(1);

var firstExecuted = new AtomicBoolean(false);

Watch watch = new Watch(
"watch",
new ScheduleTrigger(interval("1s")),
new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE,
null,
null,
Collections.emptyList(),
null,
new WatchStatus(
-1L,
new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
null,
null,
null,
null,
null
),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);

engine.register(events -> {
for (TriggerEvent ignored : events) {
if (firstExecuted.get() == false) {
logger.info("job first fire");
firstExecuted.set(true);
firstLatch.countDown();
} else {
logger.info("job second fire");
secondLatch.countDown();
}
}
});

engine.start(Collections.emptyList());
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
engine.add(watch);

advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}

advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have short comments on each of these 4 tests describing what they're for. It took me a few minutes to figure out how they all differed. And in the case of this test, are you trying to show that it does not execute too many times if you advance the clock a good bit? Is it worth adding another latch or two to prove that the watch hasn't run when you expect it not to have run yet?

if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
fail("waiting too long for all watches to be triggered");
}
engine.stop();
}

private Watch createWatch(String name, Schedule schedule) {
return new Watch(
name,
Expand Down