Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,14 @@ class RollupMapperService(
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[index][_DOC]
val indexMappingSource = indexMapping.sourceAsMap
val indexTypeMappings = res.mappings[index]
if (indexTypeMappings.isEmpty) {
return RollupJobValidationResult.Invalid("Source index [$index] mappings are empty, cannot validate the job.")
}

// Starting from 6.0.0 an index can only have one mapping type, but mapping type is still part of the APIs in 7.x, allowing users to
// set a custom mapping type. As a result using first mapping type found instead of _DOC mapping type to validate
val indexMappingSource = indexTypeMappings.first().value.sourceAsMap

val issues = mutableSetOf<String>()
// Validate source fields in dimensions
Expand Down Expand Up @@ -195,7 +201,9 @@ class RollupMapperService(
RollupJobValidationResult.Invalid("Invalid mappings for index [$index] because $issues")
}
} catch (e: Exception) {
return RollupJobValidationResult.Failure("Failed to validate the source index mappings", e)
val errorMessage = "Failed to validate the source index mappings"
logger.error(errorMessage, e)
return RollupJobValidationResult.Failure(errorMessage, e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,46 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getKibanaSampleDataMappingResponse(sourceIndex),
getMappingsResponse = getMappingResponse(sourceIndex),
getMappingsException = null
)
)
)

val clusterService = getClusterService()
val indexNameExpressionResolver = getIndexNameExpressionResolver(listOf(sourceIndex))
val mapperService = RollupMapperService(client, clusterService, indexNameExpressionResolver)

runBlocking {
val sourceIndexValidationResult = mapperService.isSourceIndexValid(rollup)
require(sourceIndexValidationResult is RollupJobValidationResult.Valid) { "Source index validation returned unexpected results" }
}
}

fun `test source index validation with custom type`() {
val sourceIndex = "test-index"

val dimensions = listOf(
randomDateHistogram().copy(
sourceField = "order_date"
)
)
val metrics = listOf(
randomRollupMetrics().copy(
sourceField = "total_quantity"
)
)
val rollup = randomRollup().copy(
enabled = true,
jobEnabledTime = Instant.now(),
dimensions = dimensions,
metrics = metrics
)

val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getMappingResponse(sourceIndex, "custom_type"),
getMappingsException = null
)
)
Expand All @@ -89,6 +128,44 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
}
}

fun `test source index validation with empty mappings`() {
val sourceIndex = "test-index"

val dimensions = listOf(
randomDateHistogram().copy(
sourceField = "order_date"
)
)
val metrics = listOf(
randomRollupMetrics().copy(
sourceField = "total_quantity"
)
)
val rollup = randomRollup().copy(
enabled = true,
jobEnabledTime = Instant.now(),
dimensions = dimensions,
metrics = metrics
)

val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getMappingResponse(sourceIndex, "custom_type", true),
getMappingsException = null
)
)
)
val clusterService = getClusterService()
val indexNameExpressionResolver = getIndexNameExpressionResolver(listOf(sourceIndex))
val mapperService = RollupMapperService(client, clusterService, indexNameExpressionResolver)

runBlocking {
val sourceIndexValidationResult = mapperService.isSourceIndexValid(rollup)
require(sourceIndexValidationResult is RollupJobValidationResult.Invalid) { "Source index validation returned unexpected results" }
}
}

fun `test source index validation with subfield`() {
val sourceIndex = "test-index"

Expand All @@ -112,11 +189,12 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getKibanaSampleDataMappingResponse(sourceIndex),
getMappingsResponse = getMappingResponse(sourceIndex),
getMappingsException = null
)
)
)

val clusterService = getClusterService()
val indexNameExpressionResolver = getIndexNameExpressionResolver(listOf(sourceIndex))
val mapperService = RollupMapperService(client, clusterService, indexNameExpressionResolver)
Expand Down Expand Up @@ -150,11 +228,12 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getKibanaSampleDataMappingResponse(sourceIndex),
getMappingsResponse = getMappingResponse(sourceIndex),
getMappingsException = null
)
)
)

val clusterService = getClusterService()
val indexNameExpressionResolver = getIndexNameExpressionResolver(listOf(sourceIndex))
val mapperService = RollupMapperService(client, clusterService, indexNameExpressionResolver)
Expand Down Expand Up @@ -183,11 +262,12 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
val client = getClient(
getAdminClient(
getIndicesAdminClient(
getMappingsResponse = getKibanaSampleDataMappingResponse(sourceIndex),
getMappingsResponse = getMappingResponse(sourceIndex),
getMappingsException = null
)
)
)

val clusterService = getClusterService()
val indexNameExpressionResolver = getIndexNameExpressionResolver(listOf(sourceIndex))
val mapperService = RollupMapperService(client, clusterService, indexNameExpressionResolver)
Expand Down Expand Up @@ -232,16 +312,19 @@ class RollupMapperServiceTests : OpenSearchTestCase() {
private fun getIndexNameExpressionResolver(concreteIndices: List<String>): IndexNameExpressionResolver =
mock { on { concreteIndexNames(any(), any(), anyBoolean(), anyVararg<String>()) } doReturn concreteIndices.toTypedArray() }

private fun getKibanaSampleDataMappingResponse(indexName: String): GetMappingsResponse {
val mappingSourceMap = createParser(
XContentType.JSON.xContent(),
javaClass.classLoader.getResource("mappings/kibana-sample-data.json").readText()
).map()
val mappingMetadata = MappingMetadata(_DOC, mappingSourceMap)

val docMappings = ImmutableOpenMap.Builder<String, MappingMetadata>()
.fPut(_DOC, mappingMetadata)
.build()
private fun getMappingResponse(indexName: String, mappingType: String = _DOC, emptyMapping: Boolean = false): GetMappingsResponse {
val docMappings = if (emptyMapping) {
ImmutableOpenMap.Builder<String, MappingMetadata>().build()
} else {
val mappingSourceMap = createParser(
XContentType.JSON.xContent(),
javaClass.classLoader.getResource("mappings/kibana-sample-data.json").readText()
).map()
val mappingMetadata = MappingMetadata(mappingType, mappingSourceMap)
ImmutableOpenMap.Builder<String, MappingMetadata>()
.fPut(mappingType, mappingMetadata)
.build()
}

val mappings = ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetadata>>()
.fPut(indexName, docMappings)
Expand Down