Skip to content

Commit 89031ce

Browse files
committed
fixed rollupFieldValueExpressionResolverTests
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
1 parent 9c01d54 commit 89031ce

File tree

4 files changed

+56
-33
lines changed

4 files changed

+56
-33
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,16 @@ class RollupMapperService(
7878
@Suppress("ReturnCount")
7979
suspend fun targetIndexIsValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {
8080

81-
if (!RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {
81+
if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
8282
return false
8383
}
8484
// All other backing indices have to have this rollup job in _META field
85-
val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
85+
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(targetIndexResolvedName)
8686
backingIndices?.forEach {
8787
if (it.index.name != targetIndexResolvedName) {
8888
when (jobExistsInRollupIndex(rollup, it.index.name)) {
8989
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return false
90+
else -> {}
9091
}
9192
}
9293
}

src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ object RollupFieldValueExpressionResolver {
2323

2424
private lateinit var scriptService: ScriptService
2525
private lateinit var clusterService: ClusterService
26+
lateinit var indexAliasUtils: IndexAliasUtils
2627
fun resolve(rollup: Rollup, fieldValue: String): String {
2728
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())
2829

@@ -34,36 +35,39 @@ object RollupFieldValueExpressionResolver {
3435
.newInstance(script.params + mapOf("ctx" to contextMap))
3536
.execute()
3637

37-
if (isAlias(compiledValue)) {
38-
compiledValue = getWriteIndexNameForAlias(compiledValue)
38+
if (indexAliasUtils.isAlias(compiledValue)) {
39+
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
3940
}
4041

4142
return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
4243
}
4344

44-
fun registerScriptService(scriptService: ScriptService) {
45+
fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
4546
this.scriptService = scriptService
47+
this.clusterService = clusterService
48+
this.indexAliasUtils = IndexAliasUtils(clusterService)
4649
}
47-
fun hasAlias(index: String): Boolean {
48-
val aliases = clusterService.state().metadata().indices.get(index)?.aliases
49-
if (aliases != null) {
50-
return aliases.size() > 0
50+
51+
class IndexAliasUtils(val clusterService: ClusterService) {
52+
53+
fun hasAlias(index: String): Boolean {
54+
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
55+
if (aliases != null) {
56+
return aliases.size() > 0
57+
}
58+
return false
5159
}
52-
return false
53-
}
54-
fun isAlias(index: String): Boolean {
55-
return clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
56-
}
57-
fun getWriteIndexNameForAlias(alias: String): String? {
58-
return clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
59-
}
6060

61-
fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
62-
return clusterService.state().metadata().indicesLookup?.get(alias)?.indices
63-
}
61+
fun isAlias(index: String): Boolean {
62+
return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
63+
}
6464

65-
fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
66-
this.scriptService = scriptService
67-
this.clusterService = clusterService
65+
fun getWriteIndexNameForAlias(alias: String): String? {
66+
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
67+
}
68+
69+
fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
70+
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices
71+
}
6872
}
6973
}

src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ class RollupRunnerIT : RollupRestTestCase() {
734734
}
735735

736736
fun `test rollup action with alias as target_index successfully`() {
737-
generateNYCTaxiData("source_runner_sixth")
737+
generateNYCTaxiData("source_runner_sixth_eleventh")
738738

739739
// Create index with alias, without mappings
740740
val indexAlias = "alias_as_target_index"
@@ -750,14 +750,14 @@ class RollupRunnerIT : RollupRestTestCase() {
750750
refreshAllIndices()
751751

752752
val rollup = Rollup(
753-
id = "page_size_runner_sixth",
753+
id = "runner_with_alias_as_target",
754754
schemaVersion = 1L,
755755
enabled = true,
756756
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
757757
jobLastUpdatedTime = Instant.now(),
758758
jobEnabledTime = Instant.now(),
759759
description = "basic change of page size",
760-
sourceIndex = "source_runner_sixth",
760+
sourceIndex = "source_runner_sixth_eleventh",
761761
targetIndex = indexAlias,
762762
metadataID = null,
763763
roles = emptyList(),
@@ -774,6 +774,7 @@ class RollupRunnerIT : RollupRestTestCase() {
774774
)
775775
).let { createRollup(it, it.id) }
776776

777+
// First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META
777778
updateRollupStartTime(rollup)
778779

779780
waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex)) }
@@ -785,13 +786,18 @@ class RollupRunnerIT : RollupRestTestCase() {
785786
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
786787
rollupJob
787788
}
789+
var rollupMetadataID = startedRollup.metadataID!!
790+
var rollupMetadata = getRollupMetadata(rollupMetadataID)
791+
assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)
788792

789793
// restart job
790794
client().makeRequest(
791795
"PUT",
792796
"$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}",
793797
emptyMap(), rollup.copy(enabled = true).toHttpEntity()
794798
)
799+
// Second run, backing index is setup just like any other rollup index
800+
updateRollupStartTime(rollup)
795801

796802
startedRollup = waitFor {
797803
val rollupJob = getRollup(rollupId = rollup.id)
@@ -801,11 +807,10 @@ class RollupRunnerIT : RollupRestTestCase() {
801807
rollupJob
802808
}
803809

804-
val rollupMetadataID = startedRollup.metadataID!!
805-
val rollupMetadata = getRollupMetadata(rollupMetadataID)
810+
rollupMetadataID = startedRollup.metadataID!!
811+
rollupMetadata = getRollupMetadata(rollupMetadataID)
806812

807-
// Randomly choosing 100.. if it didn't work we'd either fail hitting the timeout in waitFor or we'd have thousands of pages processed
808-
assertTrue("Did not have less than 100 pages processed", rollupMetadata.stats.documentsProcessed > 0)
813+
assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)
809814
}
810815

811816
// TODO: Test scenarios:

src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import com.nhaarman.mockitokotlin2.mock
1111
import com.nhaarman.mockitokotlin2.whenever
1212
import com.nhaarman.mockitokotlin2.doReturn
1313
import org.junit.Before
14+
import org.opensearch.cluster.service.ClusterService
1415
import org.opensearch.indexmanagement.rollup.randomRollup
1516
import org.opensearch.ingest.TestTemplateService
1617
import org.opensearch.script.ScriptService
@@ -20,19 +21,31 @@ import org.opensearch.test.OpenSearchTestCase
2021
class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() {
2122

2223
private val scriptService: ScriptService = mock()
23-
24+
private val clusterService: ClusterService = mock()
25+
private val indexAliasUtils: RollupFieldValueExpressionResolver.IndexAliasUtils = mock()
2426
@Before
2527
fun settings() {
26-
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
28+
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
29+
clusterService.state()
2730
}
2831

29-
fun `test resolving successfully`() {
32+
fun `test resolving no alias successfully`() {
3033
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
34+
whenever(indexAliasUtils.hasAlias(any())).doReturn(false)
3135
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
3236
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
3337
assertEquals("test_index_123", targetIndexResolvedName)
3438
}
3539

40+
fun `test resolving with alias successfully`() {
41+
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
42+
whenever(indexAliasUtils.hasAlias(any())).doReturn(true)
43+
whenever(indexAliasUtils.getWriteIndexNameForAlias(any())).doReturn("backing_index")
44+
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
45+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
46+
assertEquals("backing_index", targetIndexResolvedName)
47+
}
48+
3649
fun `test resolving failed returned passed value`() {
3750
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory(""))
3851
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")

0 commit comments

Comments
 (0)