@@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.transform.model.Transform
1919import org.opensearch.indexmanagement.transform.model.TransformMetadata
2020import org.opensearch.indexmanagement.waitFor
2121import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
22+ import org.opensearch.rest.RestRequest
2223import org.opensearch.rest.RestStatus
2324import org.opensearch.script.Script
2425import org.opensearch.script.ScriptType
@@ -204,6 +205,82 @@ class TransformRunnerIT : TransformRestTestCase() {
204205 assertTrue(" Didn't capture search time" , metadata.stats.searchTimeInMillis > 0 )
205206 }
206207
208+ fun `test transform target index _doc_count against the source index _doc_count` () {
209+ val sourceIdxTestName = " source_idx_test"
210+ val targetIdxTestName = " target_idx_test"
211+
212+ val storeAndForwardTerm = " store_and_fwd_flag"
213+ val fareAmount = " fare_amount"
214+ val avgAmountPerFlag = " avg_amount_per_store_flag"
215+
216+ validateSourceIndex(sourceIdxTestName)
217+
218+ val transform = Transform (
219+ id = " id_13" ,
220+ schemaVersion = 1L ,
221+ enabled = true ,
222+ enabledAt = Instant .now(),
223+ updatedAt = Instant .now(),
224+ jobSchedule = IntervalSchedule (Instant .now(), 1 , ChronoUnit .MINUTES ),
225+ description = " test transform doc values must be the same" ,
226+ metadataId = null ,
227+ sourceIndex = sourceIdxTestName,
228+ targetIndex = targetIdxTestName,
229+ roles = emptyList(),
230+ pageSize = 1 ,
231+ groups = listOf (
232+ Terms (sourceField = storeAndForwardTerm, targetField = storeAndForwardTerm)
233+ ),
234+ aggregations = AggregatorFactories .builder().addAggregator(AggregationBuilders .avg(fareAmount).field(fareAmount))
235+ ).let { createTransform(it, it.id) }
236+
237+ updateTransformStartTime(transform)
238+
239+ waitFor {
240+ assertTrue(" Target transform index was not created" , indexExists(transform.targetIndex))
241+ }
242+
243+ waitFor {
244+ val transformJob = getTransform(transformId = transform.id)
245+ assertNotNull(" Transform job doesn't have metadata set" , transformJob.metadataId)
246+ val transformMetadata = getTransformMetadata(transformJob.metadataId!! )
247+ assertEquals(" Transform is not finished" , TransformMetadata .Status .FINISHED , transformMetadata.status)
248+
249+ var req = """
250+ {
251+ "size": 0,
252+ "aggs": {
253+ "$avgAmountPerFlag ": {
254+ "terms": {
255+ "field": "$storeAndForwardTerm ", "order": { "_key": "asc" }
256+ },
257+ "aggs": {
258+ "avg": { "avg": { "field": "$fareAmount " } } }
259+ }
260+ }
261+ }
262+ """ .trimIndent()
263+
264+ var rawRes = client().makeRequest(RestRequest .Method .POST .name, " /$sourceIdxTestName /_search" , emptyMap(), StringEntity (req, ContentType .APPLICATION_JSON ))
265+ assertTrue(rawRes.restStatus() == RestStatus .OK )
266+
267+ var transformRes = client().makeRequest(RestRequest .Method .POST .name, " /$targetIdxTestName /_search" , emptyMap(), StringEntity (req, ContentType .APPLICATION_JSON ))
268+ assertTrue(transformRes.restStatus() == RestStatus .OK )
269+
270+ val rawAggBuckets = (rawRes.asMap()[" aggregations" ] as Map <String , Map <String , List <Map <String , Map <String , Any >>>>>)[avgAmountPerFlag]!! [" buckets" ]!!
271+ val transformAggBuckets = (transformRes.asMap()[" aggregations" ] as Map <String , Map <String , List <Map <String , Map <String , Any >>>>>)[avgAmountPerFlag]!! [" buckets" ]!!
272+
273+ assertEquals(" Different bucket sizes" , rawAggBuckets.size, transformAggBuckets.size)
274+ rawAggBuckets.forEachIndexed { idx, rawAggBucket ->
275+ val transformAggBucket = transformAggBuckets[idx]
276+ assertEquals(
277+ " The doc_count had a different value raw[$rawAggBucket ] transform[$transformAggBucket ]" ,
278+ rawAggBucket[" doc_count" ]!! , transformAggBucket[" doc_count" ]!!
279+ )
280+ }
281+ }
282+ }
283+
207284 fun `test transform with failure during indexing` () {
208285 validateSourceIndex(" transform-source-index" )
209286
@@ -497,7 +574,7 @@ class TransformRunnerIT : TransformRestTestCase() {
497574 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
498575 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
499576 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
500- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
577+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
501578 sum + docCount
502579 }
503580 assertEquals(" Not all documents included in the transform target index" , 5000 , totalDocs)
@@ -548,7 +625,7 @@ class TransformRunnerIT : TransformRestTestCase() {
548625 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
549626 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
550627 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
551- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
628+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
552629 sum + docCount
553630 }
554631 assertEquals(" Not all documents included in the transform target index" , 10000 , totalDocs)
@@ -626,7 +703,7 @@ class TransformRunnerIT : TransformRestTestCase() {
626703 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
627704 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
628705 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
629- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
706+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
630707 sum + docCount
631708 }
632709 assertEquals(" Not all documents included in the transform target index" , 48 , totalDocs)
@@ -686,7 +763,7 @@ class TransformRunnerIT : TransformRestTestCase() {
686763 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
687764 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
688765 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
689- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
766+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
690767 sum + docCount
691768 }
692769 assertEquals(" Not all documents included in the transform target index" , 100 , totalDocs)
@@ -843,7 +920,7 @@ class TransformRunnerIT : TransformRestTestCase() {
843920 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
844921 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
845922 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
846- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
923+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
847924 sum + docCount
848925 }
849926 assertEquals(" Not all documents included in the transform target index" , 52 , totalDocs)
@@ -917,7 +994,7 @@ class TransformRunnerIT : TransformRestTestCase() {
917994 assertEquals(" Request failed" , RestStatus .OK , response.restStatus())
918995 val responseHits = response.asMap().getValue(" hits" ) as Map <* , * >
919996 val totalDocs = (responseHits[" hits" ] as ArrayList <* >).fold(0 ) { sum, bucket ->
920- val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" transform. _doc_count" ] as Int
997+ val docCount = ((bucket as Map <* , * >)[" _source" ] as Map <* , * >)[" _doc_count" ] as Int
921998 sum + docCount
922999 }
9231000 assertEquals(" Not all documents included in the transform target index" , 88 , totalDocs)
0 commit comments