Skip to content

Commit f20a07a

Browse files
committed
Adds implementation for the delay feature in rollup jobs (opensearch-project#147)
* Adds delay implementation for rollup jobs * Removes non-continuous delay implementation * Adds additional rollup delay tests Signed-off-by: Clay Downs <downsrob@amazon.com>
1 parent 1d8fbd4 commit f20a07a

File tree

10 files changed

+268
-14
lines changed

10 files changed

+268
-14
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ class RollupSearchService(
104104
logger.debug("Non-continuous job [${rollup.id}] is not processing next window [$metadata]")
105105
return false
106106
} else {
107-
return hasNextFullWindow(metadata) // TODO: Behavior when next full window but 0 docs/afterkey is null
107+
return hasNextFullWindow(rollup, metadata) // TODO: Behavior when next full window but 0 docs/afterkey is null
108108
}
109109
}
110110

111-
private fun hasNextFullWindow(metadata: RollupMetadata): Boolean {
112-
return Instant.now().isAfter(metadata.continuous!!.nextWindowEndTime) // TODO: !!
111+
private fun hasNextFullWindow(rollup: Rollup, metadata: RollupMetadata): Boolean {
112+
return Instant.now().isAfter(metadata.continuous!!.nextWindowEndTime.plusMillis(rollup.delay ?: 0)) // TODO: !!
113113
}
114114

115115
@Suppress("ComplexMethod")

src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ data class Rollup(
6161
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
6262
val enabled: Boolean,
6363
val schemaVersion: Long,
64-
val jobSchedule: Schedule,
64+
var jobSchedule: Schedule,
6565
val jobLastUpdatedTime: Instant,
6666
val jobEnabledTime: Instant?,
6767
val description: String,
@@ -83,12 +83,26 @@ data class Rollup(
8383
} else {
8484
require(jobEnabledTime == null) { "Job enabled time must not be present if the job is disabled" }
8585
}
86+
// Copy the delay parameter of the job into the job scheduler for continuous jobs only
87+
if (jobSchedule.delay != delay && continuous) {
88+
jobSchedule = when (jobSchedule) {
89+
is CronSchedule -> {
90+
val cronSchedule = jobSchedule as CronSchedule
91+
CronSchedule(cronSchedule.cronExpression, cronSchedule.timeZone, delay ?: 0)
92+
}
93+
is IntervalSchedule -> {
94+
val intervalSchedule = jobSchedule as IntervalSchedule
95+
IntervalSchedule(intervalSchedule.startTime, intervalSchedule.interval, intervalSchedule.unit, delay ?: 0)
96+
}
97+
else -> jobSchedule
98+
}
99+
}
86100
when (jobSchedule) {
87101
is CronSchedule -> {
88102
// Job scheduler already correctly throws errors for this
89103
}
90104
is IntervalSchedule -> {
91-
require(jobSchedule.interval >= MINIMUM_JOB_INTERVAL) { "Rollup job schedule interval must be greater than 0" }
105+
require((jobSchedule as IntervalSchedule).interval >= MINIMUM_JOB_INTERVAL) { "Rollup job schedule interval must be greater than 0" }
92106
}
93107
}
94108
require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" }
@@ -97,7 +111,10 @@ data class Rollup(
97111
}
98112
require(dimensions.first().type == Dimension.Type.DATE_HISTOGRAM) { "The first dimension must be a date histogram" }
99113
require(pageSize in MINIMUM_PAGE_SIZE..MAXIMUM_PAGE_SIZE) { "Page size must be between 1 and 10,000" }
100-
if (delay != null) require(delay >= MINIMUM_DELAY) { "Delay must be non-negative if set" }
114+
if (delay != null) {
115+
require(delay >= MINIMUM_DELAY) { "Delay must be non-negative if set" }
116+
require(delay <= Instant.now().toEpochMilli()) { "Delay must be less than the current unix time" }
117+
}
101118
}
102119

103120
override fun isEnabled() = enabled
@@ -331,7 +348,7 @@ data class Rollup(
331348
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
332349
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
333350
if (schedule is IntervalSchedule) {
334-
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit)
351+
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
335352
}
336353
}
337354
return Rollup(

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 11
3+
"schema_version": 12
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -600,6 +600,9 @@
600600
"start_time": {
601601
"type": "date",
602602
"format": "strict_date_time||epoch_millis"
603+
},
604+
"schedule_delay": {
605+
"type": "long"
603606
}
604607
}
605608
},
@@ -610,6 +613,9 @@
610613
},
611614
"timezone": {
612615
"type": "keyword"
616+
},
617+
"schedule_delay": {
618+
"type": "long"
613619
}
614620
}
615621
}
@@ -792,6 +798,9 @@
792798
"start_time": {
793799
"type": "date",
794800
"format": "strict_date_time||epoch_millis"
801+
},
802+
"schedule_delay": {
803+
"type": "long"
795804
}
796805
}
797806
},
@@ -802,6 +811,9 @@
802811
},
803812
"timezone": {
804813
"type": "keyword"
814+
},
815+
"schedule_delay": {
816+
"type": "long"
805817
}
806818
}
807819
}
@@ -1046,6 +1058,9 @@
10461058
"start_time": {
10471059
"type": "date",
10481060
"format": "strict_date_time||epoch_millis"
1061+
},
1062+
"schedule_delay": {
1063+
"type": "long"
10491064
}
10501065
}
10511066
},
@@ -1056,6 +1071,9 @@
10561071
},
10571072
"timezone": {
10581073
"type": "keyword"
1074+
},
1075+
"schedule_delay": {
1076+
"type": "long"
10591077
}
10601078
}
10611079
}

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import javax.management.remote.JMXServiceURL
4646

4747
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
4848

49-
val configSchemaVersion = 11
49+
val configSchemaVersion = 12
5050
val historySchemaVersion = 3
5151

5252
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as

src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ fun randomRollup(): Rollup {
129129
metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10),
130130
roles = OpenSearchRestTestCase.randomList(10) { OpenSearchRestTestCase.randomAlphaOfLength(10) },
131131
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
132-
delay = OpenSearchRestTestCase.randomNonNegativeLong(),
132+
delay = 0,
133133
continuous = OpenSearchRestTestCase.randomBoolean(),
134134
dimensions = randomRollupDimensions(),
135135
metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField },

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package org.opensearch.indexmanagement.rollup.model
2828

2929
import org.opensearch.indexmanagement.randomInstant
30+
import org.opensearch.indexmanagement.randomSchedule
3031
import org.opensearch.indexmanagement.rollup.randomDateHistogram
3132
import org.opensearch.indexmanagement.rollup.randomRollup
3233
import org.opensearch.indexmanagement.rollup.randomTerms
@@ -95,9 +96,30 @@ class RollupTests : OpenSearchTestCase() {
9596
assertFailsWith(IllegalArgumentException::class, "Delay was negative") {
9697
randomRollup().copy(delay = -1)
9798
}
99+
assertFailsWith(IllegalArgumentException::class, "Delay was too high") {
100+
randomRollup().copy(delay = Long.MAX_VALUE)
101+
}
98102

99103
// These should successfully parse without exceptions
100104
randomRollup().copy(delay = 0)
101105
randomRollup().copy(delay = 930490)
106+
randomRollup().copy(delay = null)
107+
}
108+
109+
fun `test delay applies to continuous rollups only`() {
110+
// Continuous rollup schedule matches delay
111+
val newDelay: Long = 500
112+
val continuousRollup = randomRollup().copy(
113+
delay = newDelay,
114+
continuous = true
115+
)
116+
assertEquals(newDelay, continuousRollup.jobSchedule.delay)
117+
// Non continuous rollup schedule should have null delay
118+
val nonContinuousRollup = randomRollup().copy(
119+
jobSchedule = randomSchedule(),
120+
delay = newDelay,
121+
continuous = false
122+
)
123+
assertNull(nonContinuousRollup.jobSchedule.delay)
102124
}
103125
}

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class WriteableTests : OpenSearchTestCase() {
128128
}
129129

130130
fun `test rollup as stream`() {
131-
val rollup = randomRollup()
131+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
132132
val out = BytesStreamOutput().also { rollup.writeTo(it) }
133133
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
134134
val streamedRollup = Rollup(sin)

src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class XContentTests : OpenSearchTestCase() {
126126
}
127127

128128
fun `test rollup parsing with type`() {
129-
val rollup = randomRollup()
129+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
130130
val rollupString = rollup.toJsonString()
131131
val parser = parserWithType(rollupString)
132132
val parsedRollup = parser.parseWithType(rollup.id, rollup.seqNo, rollup.primaryTerm, Rollup.Companion::parse)
@@ -135,7 +135,7 @@ class XContentTests : OpenSearchTestCase() {
135135
}
136136

137137
fun `test rollup parsing without type`() {
138-
val rollup = randomRollup()
138+
val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000))
139139
val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE)
140140
val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm)
141141
// roles are deprecated and not populated in toXContent and parsed as part of parse

0 commit comments

Comments
 (0)