Skip to content

Commit

Permalink
optimize bucket level monitor to resolve alias to query only those ti…
Browse files Browse the repository at this point in the history
…me-series indices that contain docs within timeframe of range query filter in search input (#1701)

* optimize bucket level monitor to resolve alias to query only those time-series indices that contain docs within timeframe of range query filter in search input

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add tests to verify alias based optimziation scenarios

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix tests to verify aggregation query on alias optimziation with indices being skipped and not skipped

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* null check addded

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep authored Oct 21, 2024
1 parent a1bf5fb commit e3c9fc2
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
.registerInputService(
InputService(
client,
scriptService,
namedWriteableRegistry,
xContentRegistry,
clusterService,
settings,
indexNameExpressionResolver
)
)
.registerTriggerService(triggerService)
.registerAlertService(alertService)
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
Expand Down
135 changes: 133 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
Expand All @@ -40,12 +42,14 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Duration
import java.time.Instant

/** Service that handles the collection of input results for Monitor executions */
Expand All @@ -55,7 +59,8 @@ class InputService(
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
val settings: Settings
val settings: Settings,
val indexNameExpressionResolver: IndexNameExpressionResolver
) {

private val logger = LogManager.getLogger(InputService::class.java)
Expand Down Expand Up @@ -245,8 +250,19 @@ class InputService(
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService)

val resolvedIndexes = if (searchInput.query.query() == null) indexes else {
val query = searchInput.query.query()
resolveOnlyQueryableIndicesFromLocalClusterAliases(
monitor,
periodEnd,
query,
indexes
)
}

val searchRequest = SearchRequest()
.indices(*indexes.toTypedArray())
.indices(*resolvedIndexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())

XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
Expand All @@ -256,6 +272,72 @@ class InputService(
return searchRequest
}

/**
* Resolves concrete indices from aliases based on a time range query and availability in the local cluster.
*
* <p>If an index passed to OpenSearch is an alias, this method will only select those indices
* resolved from the alias that meet the following criteria:
*
* <ol>
* <li>The index's creation date falls within the time range specified in the query's timestamp field.</li>
* <li>The index immediately preceding the time range in terms of creation date is also included.</li>
* </ol>
*
* <p>This ensures that queries targeting aliases consider relevant indices based on their creation time,
* including the one immediately before the specified range to account for potential data at the boundary.
*/
private fun resolveOnlyQueryableIndicesFromLocalClusterAliases(
monitor: Monitor,
periodEnd: Instant,
query: QueryBuilder,
indexes: List<String>,
): List<String> {
val resolvedIndexes = ArrayList<String>()
indexes.forEach {
// we don't optimize for remote cluster aliases. we directly pass them to search request
if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService))
resolvedIndexes.add(it)
else {
val state = clusterService.state()
if (IndexUtils.isAlias(it, state)) {
val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd)
if (resolveStartTimeOfQueryTimeRange != null) {
val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver)
val sortedIndices = indices
.mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index
.sortedBy { it.creationDate } // Sort by creation date

var includePrevious = true
for (i in sortedIndices.indices) {
val indexMetadata = sortedIndices[i]
val creationDate = indexMetadata.creationDate

if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) {
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false // No need to include previous anymore
} else if (
includePrevious && (
i == sortedIndices.lastIndex ||
sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()
)
) {
// Include the index immediately before the timestamp
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false
}
}
} else {
// add alias without optimizing for resolve indices
resolvedIndexes.add(it)
}
} else {
resolvedIndexes.add(it)
}
}
}
return resolvedIndexes
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

Expand Down Expand Up @@ -289,4 +371,53 @@ class InputService(
}
return results
}

fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? {
try {
val rangeQuery = findRangeQuery(query) ?: return null
val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||-<timeframe>

val timeframeString = searchParameter.substringAfter("||-")
val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)")
val matchResult = timeframeRegex.find(timeframeString)
val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u }
?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString")
val duration = when (unit) {
"s" -> Duration.ofSeconds(amount.toLong())
"m" -> Duration.ofMinutes(amount.toLong())
"h" -> Duration.ofHours(amount.toLong())
"d" -> Duration.ofDays(amount.toLong())
else -> throw IllegalArgumentException("Invalid time unit: $unit")
}

return periodEnd.minus(duration)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id}:" +
" Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices",
e
)
return null // won't do optimization as we failed to resolve the timeframe due to unexpected error
}
}

private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? {
if (queryBuilder == null) return null
if (queryBuilder is RangeQueryBuilder) return queryBuilder

if (queryBuilder is BoolQueryBuilder) {
for (clause in queryBuilder.must()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
for (clause in queryBuilder.should()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
// You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed
}

// Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.)
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils {
return if (clusterName.isNotEmpty()) "$clusterName:$indexName"
else indexName
}

fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean {
val clusterName = parseClusterName(index)
return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1000,19 +1000,33 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex))
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
numOfAliasIndices: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
indicesMapping: String,
): MutableMap<String, MutableMap<String, Boolean>> {
return createTestAlias(
alias = alias,
indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex),
indicesMapping = indicesMapping
)
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
indices: Map<String, Boolean> = randomAliasIndices(
alias = alias,
num = randomIntBetween(1, 10),
includeWriteIndex = true
),
createIndices: Boolean = true
createIndices: Boolean = true,
indicesMapping: String = ""
): MutableMap<String, MutableMap<String, Boolean>> {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
if (createIndices) createTestIndex(index = it, mapping = "")
if (createIndices) createTestIndex(index = it, indicesMapping)
val isWriteIndex = indices.getOrDefault(it, false)
indicesMap[it] = isWriteIndex
val indexMap = mapOf(
Expand Down Expand Up @@ -1211,6 +1225,41 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
}
}

protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List<String>) {
data.forEachIndexed { i, value ->
val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun insertSampleTimeSerializedDataWithTime(
index: String,
data: List<String>,
time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS),
) {
data.forEachIndexed { i, value ->
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun deleteDataWithDocIds(index: String, docIds: List<String>) {
docIds.forEach {
deleteDoc(index, it)
Expand Down
Loading

0 comments on commit e3c9fc2

Please sign in to comment.