Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ data class Transform(
return if (includeId) {
mutableMapOf(
TRANSFORM_DOC_ID_FIELD to this.id,
_DOC_COUNT to docCount,
TRANSFORM_DOC_COUNT_FIELD to docCount
)
} else {
mutableMapOf(
_DOC_COUNT to docCount,
TRANSFORM_DOC_COUNT_FIELD to docCount
)
}
Expand Down Expand Up @@ -286,6 +288,8 @@ data class Transform(
const val MAXIMUM_PAGE_SIZE_CONTINUOUS = 1_000
const val MINIMUM_JOB_INTERVAL = 1
const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id"
const val _DOC_COUNT = "_doc_count"
// Keeping the field in order to be backward compatible
const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count"
const val CONTINUOUS_FIELD = "continuous"
const val USER_FIELD = "user"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
Expand Down Expand Up @@ -204,6 +205,82 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0)
}

fun `test transform target index _doc_count against the source index _doc_count`() {
val sourceIdxTestName = "source_idx_test"
val targetIdxTestName = "target_idx_test"

val storeAndForwardTerm = "store_and_fwd_flag"
val fareAmount = "fare_amount"
val avgAmountPerFlag = "avg_amount_per_store_flag"

validateSourceIndex(sourceIdxTestName)

val transform = Transform(
id = "id_13",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform doc values must be the same",
metadataId = null,
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
roles = emptyList(),
pageSize = 1,
groups = listOf(
Terms(sourceField = storeAndForwardTerm, targetField = storeAndForwardTerm)
),
aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount))
).let { createTransform(it, it.id) }

updateTransformStartTime(transform)

waitFor {
assertTrue("Target transform index was not created", indexExists(transform.targetIndex))
}

waitFor {
val transformJob = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)

var req = """
{
"size": 0,
"aggs": {
"$avgAmountPerFlag": {
"terms": {
"field": "$storeAndForwardTerm", "order": { "_key": "asc" }
},
"aggs": {
"avg": { "avg": { "field": "$fareAmount" } } }
}
}
}
""".trimIndent()

var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)

var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(transformRes.restStatus() == RestStatus.OK)

val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[avgAmountPerFlag]!!["buckets"]!!
val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[avgAmountPerFlag]!!["buckets"]!!

assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size)
rawAggBuckets.forEachIndexed { idx, rawAggBucket ->
val transformAggBucket = transformAggBuckets[idx]
assertEquals(
"The doc_count had a different value raw[$rawAggBucket] transform[$transformAggBucket]",
rawAggBucket["doc_count"]!!, transformAggBucket["doc_count"]!!
)
}
}
}

fun `test transform with failure during indexing`() {
validateSourceIndex("transform-source-index")

Expand Down Expand Up @@ -497,7 +574,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 5000, totalDocs)
Expand Down Expand Up @@ -548,7 +625,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 10000, totalDocs)
Expand Down Expand Up @@ -626,7 +703,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 48, totalDocs)
Expand Down Expand Up @@ -686,7 +763,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 100, totalDocs)
Expand Down Expand Up @@ -843,7 +920,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 52, totalDocs)
Expand Down Expand Up @@ -917,7 +994,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 88, totalDocs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
emptyMap(),
transform.toHttpEntity()
)
val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count")
val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count", "_doc_count")
assertEquals("Preview transform failed", RestStatus.OK, response.restStatus())
val transformedDocs = response.asMap()["documents"] as List<Map<String, Any>>
assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys)
Expand Down