Skip to content

Commit 9c01d54

Browse files
committed
Multi rollup ndx search (#453)
* fixed failing rollupInterceptorIT test * reverted old error messages * reverted checking for matching jobs on whole set instead of job by job; Added picking rollup job deterministic * fixed sorting * added ITs for multi rollup index search * added ITs for multi rollup index search#2 * detekt fixes * changed index names and rollup job * detekt fix * empty commit to trigger test pipeline Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
1 parent 600df4f commit 9c01d54

File tree

2 files changed

+302
-0
lines changed

2 files changed

+302
-0
lines changed

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,20 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
138138
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
139139
}
140140

141+
protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
142+
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
143+
assertNotNull(shards)
144+
val failures = shards["failures"]
145+
assertNotNull(failures)
146+
return failures?.let {
147+
val result: ArrayList<Map<String, String>?>? = ArrayList()
148+
for (failure in it) {
149+
result?.add((failure as Map<String, Map<String, String>>)["reason"])
150+
}
151+
return result
152+
}
153+
}
154+
141155
companion object {
142156
internal interface IProxy {
143157
val version: String?

src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,4 +789,292 @@ class RollupInterceptorIT : RollupRestTestCase() {
789789
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
790790
)
791791
}
792+
793+
fun `test rollup search multiple target indices successfully`() {
794+
val sourceIndex1 = "source_rollup_search_multi_jobs_1"
795+
val sourceIndex2 = "source_rollup_search_multi_jobs_2"
796+
generateNYCTaxiData(sourceIndex1)
797+
generateNYCTaxiData(sourceIndex2)
798+
val targetIndex1 = "target_rollup_search_multi_jobs1"
799+
val targetIndex2 = "target_rollup_search_multi_jobs2"
800+
val rollupHourly1 = Rollup(
801+
id = "hourly_basic_term_query_rollup_search_multi_1",
802+
enabled = true,
803+
schemaVersion = 1L,
804+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
805+
jobLastUpdatedTime = Instant.now(),
806+
jobEnabledTime = Instant.now(),
807+
description = "basic search test",
808+
sourceIndex = sourceIndex1,
809+
targetIndex = targetIndex1,
810+
metadataID = null,
811+
roles = emptyList(),
812+
pageSize = 10,
813+
delay = 0,
814+
continuous = false,
815+
dimensions = listOf(
816+
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
817+
Terms("RatecodeID", "RatecodeID"),
818+
Terms("PULocationID", "PULocationID")
819+
),
820+
metrics = listOf(
821+
RollupMetrics(
822+
sourceField = "passenger_count", targetField = "passenger_count",
823+
metrics = listOf(
824+
Sum(), Min(), Max(),
825+
ValueCount(), Average()
826+
)
827+
),
828+
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
829+
)
830+
).let { createRollup(it, it.id) }
831+
832+
updateRollupStartTime(rollupHourly1)
833+
834+
waitFor {
835+
val rollupJob = getRollup(rollupId = rollupHourly1.id)
836+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
837+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
838+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
839+
}
840+
841+
val rollupHourly2 = Rollup(
842+
id = "hourly_basic_term_query_rollup_search_multi_2",
843+
enabled = true,
844+
schemaVersion = 1L,
845+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
846+
jobLastUpdatedTime = Instant.now(),
847+
jobEnabledTime = Instant.now(),
848+
description = "basic search test",
849+
sourceIndex = sourceIndex2,
850+
targetIndex = targetIndex2,
851+
metadataID = null,
852+
roles = emptyList(),
853+
pageSize = 10,
854+
delay = 0,
855+
continuous = false,
856+
dimensions = listOf(
857+
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
858+
Terms("RatecodeID", "RatecodeID"),
859+
Terms("PULocationID", "PULocationID")
860+
),
861+
metrics = listOf(
862+
RollupMetrics(
863+
sourceField = "passenger_count", targetField = "passenger_count",
864+
metrics = listOf(
865+
Sum(), Min(), Max(),
866+
ValueCount(), Average()
867+
)
868+
),
869+
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
870+
)
871+
).let { createRollup(it, it.id) }
872+
873+
updateRollupStartTime(rollupHourly2)
874+
875+
waitFor {
876+
val rollupJob = getRollup(rollupId = rollupHourly2.id)
877+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
878+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
879+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
880+
}
881+
882+
refreshAllIndices()
883+
884+
val req = """
885+
{
886+
"size": 0,
887+
"query": {
888+
"term": { "RatecodeID": 1 }
889+
},
890+
"aggs": {
891+
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
892+
"max_passenger_count": { "max": { "field": "passenger_count" } },
893+
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
894+
}
895+
}
896+
""".trimIndent()
897+
val rawRes1 = client().makeRequest("POST", "/$sourceIndex1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
898+
assertTrue(rawRes1.restStatus() == RestStatus.OK)
899+
val rawRes2 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
900+
assertTrue(rawRes2.restStatus() == RestStatus.OK)
901+
val rollupResMulti = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
902+
assertTrue(rollupResMulti.restStatus() == RestStatus.OK)
903+
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
904+
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
905+
val rollupAggResMulti = rollupResMulti.asMap()["aggregations"] as Map<String, Map<String, Any>>
906+
907+
// When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both
908+
assertEquals(
909+
"Searching single rollup job and rollup target index did not return the same max results",
910+
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"]
911+
)
912+
assertEquals(
913+
"Searching single rollup job and rollup target index did not return the same sum results",
914+
rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResMulti.getValue("sum_passenger_count")["value"]
915+
)
916+
val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int
917+
assertEquals(
918+
"Searching single rollup job and rollup target index did not return the same value count results",
919+
rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResMulti.getValue("value_count_passenger_count")["value"]
920+
)
921+
922+
val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
923+
updateSearchAllJobsClusterSetting(true)
924+
925+
val rollupResAll = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
926+
assertTrue(rollupResAll.restStatus() == RestStatus.OK)
927+
val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map<String, Map<String, Any>>
928+
929+
// With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results
930+
assertEquals(
931+
"Searching single rollup job and rollup target index did not return the same sum results",
932+
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"]
933+
)
934+
assertEquals(
935+
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
936+
trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"]
937+
)
938+
assertEquals(
939+
"Searching rollup target index did not return the value count for all of the rollup jobs on the index",
940+
trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"]
941+
)
942+
}
943+
944+
fun `test rollup search multiple target indices failed`() {
945+
val sourceIndex1 = "source_rollup_search_multi_failed_1"
946+
val sourceIndex2 = "source_rollup_search_multi_failed_2"
947+
generateNYCTaxiData(sourceIndex1)
948+
generateNYCTaxiData(sourceIndex2)
949+
val targetIndex1 = "target_rollup_search_multi_failed_jobs1"
950+
val targetIndex2 = "target_rollup_search_multi_failed_jobs2"
951+
val rollupJob1 = Rollup(
952+
id = "hourly_basic_term_query_rollup_search_failed_1",
953+
enabled = true,
954+
schemaVersion = 1L,
955+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
956+
jobLastUpdatedTime = Instant.now(),
957+
jobEnabledTime = Instant.now(),
958+
description = "basic search test",
959+
sourceIndex = sourceIndex1,
960+
targetIndex = targetIndex1,
961+
metadataID = null,
962+
roles = emptyList(),
963+
pageSize = 10,
964+
delay = 0,
965+
continuous = false,
966+
dimensions = listOf(
967+
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
968+
Terms("VendorID", "VendorID"),
969+
),
970+
metrics = listOf(
971+
RollupMetrics(
972+
sourceField = "fare_amount", targetField = "fare_amount",
973+
metrics = listOf(
974+
Sum(), Min(), Max(),
975+
ValueCount(), Average()
976+
)
977+
),
978+
RollupMetrics(sourceField = "improvement_surcharge", targetField = "improvement_surcharge", metrics = listOf(Max(), Min()))
979+
)
980+
).let { createRollup(it, it.id) }
981+
982+
updateRollupStartTime(rollupJob1)
983+
984+
waitFor {
985+
val rollupJob = getRollup(rollupId = rollupJob1.id)
986+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
987+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
988+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
989+
}
990+
991+
val rollupJob2 = Rollup(
992+
id = "hourly_basic_term_query_rollup_search_failed_2",
993+
enabled = true,
994+
schemaVersion = 1L,
995+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
996+
jobLastUpdatedTime = Instant.now(),
997+
jobEnabledTime = Instant.now(),
998+
description = "basic search test",
999+
sourceIndex = sourceIndex2,
1000+
targetIndex = targetIndex2,
1001+
metadataID = null,
1002+
roles = emptyList(),
1003+
pageSize = 10,
1004+
delay = 0,
1005+
continuous = false,
1006+
dimensions = listOf(
1007+
DateHistogram(sourceField = "tpep_dropoff_datetime", fixedInterval = "1h"),
1008+
Terms("RatecodeID", "RatecodeID"),
1009+
Terms("PULocationID", "PULocationID")
1010+
),
1011+
metrics = listOf(
1012+
RollupMetrics(
1013+
sourceField = "passenger_count", targetField = "passenger_count",
1014+
metrics = listOf(
1015+
Sum(), Min(), Max(),
1016+
ValueCount(), Average()
1017+
)
1018+
),
1019+
RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min()))
1020+
)
1021+
).let { createRollup(it, it.id) }
1022+
1023+
updateRollupStartTime(rollupJob2)
1024+
1025+
waitFor {
1026+
val rollupJob = getRollup(rollupId = rollupJob2.id)
1027+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
1028+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
1029+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
1030+
}
1031+
1032+
refreshAllIndices()
1033+
1034+
val req = """
1035+
{
1036+
"size": 0,
1037+
"query": {
1038+
"term": { "RatecodeID": 1 }
1039+
},
1040+
"aggs": {
1041+
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
1042+
"max_passenger_count": { "max": { "field": "passenger_count" } },
1043+
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
1044+
}
1045+
}
1046+
""".trimIndent()
1047+
// Search 1 non-rollup index and 1 rollup
1048+
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1049+
assertTrue(searchResult1.restStatus() == RestStatus.OK)
1050+
val failures = extractFailuresFromSearchResponse(searchResult1)
1051+
assertNotNull(failures)
1052+
assertEquals(1, failures?.size)
1053+
assertEquals(
1054+
"Searching multiple indices where one is rollup and other is not, didn't return failure",
1055+
"illegal_argument_exception", failures?.get(0)?.get("type") ?: "Didn't find failure type in search response"
1056+
1057+
)
1058+
assertEquals(
1059+
"Searching multiple indices where one is rollup and other is not, didn't return failure",
1060+
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response"
1061+
)
1062+
1063+
// Search 2 rollups with different mappings
1064+
try {
1065+
client().makeRequest(
1066+
"POST",
1067+
"/$targetIndex1,$targetIndex2/_search",
1068+
emptyMap(),
1069+
StringEntity(req, ContentType.APPLICATION_JSON)
1070+
)
1071+
} catch (e: ResponseException) {
1072+
assertEquals(
1073+
"Searching multiple rollup indices which weren't created by same rollup job, didn't return failure",
1074+
"Could not find a rollup job that can answer this query because [missing field RatecodeID, missing field passenger_count]",
1075+
(e.response.asMap() as Map<String, Map<String, Map<String, String>>>)["error"]!!["caused_by"]!!["reason"]
1076+
)
1077+
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
1078+
}
1079+
}
7921080
}

0 commit comments

Comments
 (0)