Skip to content

Commit f999a46

Browse files
committed
Adds additional rollup delay tests
Signed-off-by: Clay Downs <downsrob@amazon.com>
1 parent 659dfbc commit f999a46

File tree

5 files changed

+23
-18
lines changed

5 files changed

+23
-18
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,7 @@ data class Rollup(
348348
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
349349
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
350350
if (schedule is IntervalSchedule) {
351-
schedule = if (schedule.delay == null) {
352-
IntervalSchedule(Instant.now(), schedule.interval, schedule.unit)
353-
} else {
354-
IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay)
355-
}
351+
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
356352
}
357353
}
358354
return Rollup(

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class RollupTests : OpenSearchTestCase() {
106106
randomRollup().copy(delay = null)
107107
}
108108

109-
fun `test rollup applies to continuous rollups only`() {
109+
fun `test delay applies to continuous rollups only`() {
110110
// Continuous rollup schedule matches delay
111111
val newDelay: Long = 500
112112
val continuousRollup = randomRollup().copy(

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class WriteableTests : OpenSearchTestCase() {
128128
}
129129

130130
fun `test rollup as stream`() {
131-
val rollup = randomRollup()
131+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
132132
val out = BytesStreamOutput().also { rollup.writeTo(it) }
133133
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
134134
val streamedRollup = Rollup(sin)

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class XContentTests : OpenSearchTestCase() {
126126
}
127127

128128
fun `test rollup parsing with type`() {
129-
val rollup = randomRollup()
129+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
130130
val rollupString = rollup.toJsonString()
131131
val parser = parserWithType(rollupString)
132132
val parsedRollup = parser.parseWithType(rollup.id, rollup.seqNo, rollup.primaryTerm, Rollup.Companion::parse)
@@ -135,7 +135,7 @@ class XContentTests : OpenSearchTestCase() {
135135
}
136136

137137
fun `test rollup parsing without type`() {
138-
val rollup = randomRollup()
138+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
139139
val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE)
140140
val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm)
141141
// roles are deprecated and not populated in toXContent and parsed as part of parse

src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -577,15 +577,15 @@ class RollupRunnerIT : RollupRestTestCase() {
577577
// Define rollup
578578
var rollup = randomRollup().copy(
579579
enabled = true,
580-
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES, 0),
580+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
581581
jobEnabledTime = Instant.now(),
582582
sourceIndex = indexName,
583583
metadataID = null,
584584
continuous = true,
585585
delay = delay,
586586
dimensions = listOf(
587587
randomCalendarDateHistogram().copy(
588-
calendarInterval = "1s"
588+
calendarInterval = "5s"
589589
)
590590
)
591591
)
@@ -598,7 +598,7 @@ class RollupRunnerIT : RollupRestTestCase() {
598598
// Create rollup job
599599
rollup = createRollup(rollup = rollup, rollupId = rollup.id)
600600

601-
val nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
601+
var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
602602
val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli()
603603
val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500)
604604
assertTrue("Delay was not correctly applied", delayIsCorrect)
@@ -609,20 +609,29 @@ class RollupRunnerIT : RollupRestTestCase() {
609609
// Still should not have run at this point
610610
assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex))
611611
}
612-
613-
waitFor {
612+
val rollupMetadata = waitFor {
614613
assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex))
615614
val rollupJob = getRollup(rollupId = rollup.id)
616615
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
617616
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
618617
assertNotNull("Rollup metadata not found", rollupMetadata)
619-
val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime
620-
assertTrue("Rollup window not excluding the delay", nextWindowStartTime < Instant.now().minusMillis(delay))
621-
assertTrue("Rollup window not updating", nextWindowStartTime > Instant.now().minusMillis(delay + 2000))
618+
rollupMetadata
622619
}
620+
nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
621+
val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000
622+
val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000
623+
assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect)
624+
val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime
625+
val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime
626+
// Assert that after the window was updated, it falls approximately around 'now'
627+
assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now())
628+
assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now())
629+
630+
// window length should be 5 seconds
631+
val expectedWindowEnd = nextWindowStartTime.plusMillis(5000)
632+
assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime)
623633
}
624634

625-
// Tests that the non continuous delay does nothing
626635
fun `test non continuous delay does nothing`() {
627636
generateNYCTaxiData("source_runner_ninth")
628637

0 commit comments

Comments
 (0)