Skip to content

Commit

Permalink
backport monitor datasources to 2.x (#558)
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep authored Sep 15, 2022
1 parent f34ee22 commit 7e812ab
Show file tree
Hide file tree
Showing 16 changed files with 679 additions and 40 deletions.
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ dependencies {

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:4.7.0"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
21 changes: 14 additions & 7 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.AlertCategory
Expand Down Expand Up @@ -277,7 +278,13 @@ class AlertService(
} ?: listOf()
}

suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false) {
suspend fun saveAlerts(
dataSources: DataSources,
alerts: List<Alert>,
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
) {
val alertIndex = dataSources.alertsIndex
var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand All @@ -286,7 +293,7 @@ class AlertService(
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -297,7 +304,7 @@ class AlertService(
// and updated by the MonitorRunner
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -311,10 +318,10 @@ class AlertService(
}
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
DeleteRequest(alertIndex, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
if (alertIndices.isAlertHistoryEnabled(dataSources)) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
Expand Down Expand Up @@ -349,7 +356,7 @@ class AlertService(
* The Alerts are required with their indexed ID so that when the new Alerts are updated after the Action execution,
* the ID is available for the index request so that the existing Alert can be updated, instead of creating a duplicate Alert document.
*/
suspend fun saveNewAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy): List<Alert> {
suspend fun saveNewAlerts(dataSources: DataSources, alerts: List<Alert>, retryPolicy: BackoffPolicy): List<Alert> {
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
Expand All @@ -359,7 +366,7 @@ class AlertService(
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(AlertIndices.ALERT_INDEX)
IndexRequest(dataSources.alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {

var monitorResult = MonitorRunResult<BucketLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down Expand Up @@ -135,8 +135,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
* will still execute with the Alert information in the ctx but the Alerts may not be visible.
*/
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true)
newAlerts = monitorCtx.alertService!!.saveNewAlerts(newAlerts, monitorCtx.retryPolicy!!)
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true
)
newAlerts = monitorCtx.alertService!!.saveNewAlerts(monitor.dataSources, newAlerts, monitorCtx.retryPolicy!!)
}

// Store deduped and new Alerts to accumulate across pages
Expand Down Expand Up @@ -269,9 +271,12 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// Update Alerts with action execution results (if it's not a test Monitor).
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false)
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false
)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources,
completedAlertsToUpdate.toList(),
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata
Expand All @@ -41,7 +40,6 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.index.query.BoolQueryBuilder
Expand Down Expand Up @@ -71,9 +69,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
Expand All @@ -87,7 +85,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(error = AlertingException.wrap(e))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
Expand Down Expand Up @@ -291,7 +289,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alert.copy(actionExecutionResults = actionExecutionResults)
}

monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(updatedAlerts, it) }
monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) }
}
return triggerResult
}
Expand Down Expand Up @@ -320,7 +318,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX)
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand Down Expand Up @@ -507,7 +505,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
boolQueryBuilder.filter(percolateQueryBuilder)

val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
val queryIndex = monitor.dataSources.queryIndex
val searchRequest = SearchRequest(queryIndex)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand All @@ -517,7 +516,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}")
throw IOException("Failed to search percolate index: $queryIndex")
}
return response.hits
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object QueryLevelMonitorRunner : MonitorRunner() {

var monitorResult = MonitorRunResult<QueryLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down Expand Up @@ -81,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {

// Don't save alerts if this is a test monitor
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(updatedAlerts, it) }
monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) }
}
return monitorResult.copy(triggerResults = triggerResults)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.threadpool.Scheduler.Cancellable
import org.opensearch.threadpool.ThreadPool
import java.time.Instant
Expand Down Expand Up @@ -230,7 +231,12 @@ class AlertIndices(
return alertIndexInitialized && alertHistoryIndexInitialized
}

fun isAlertHistoryEnabled(): Boolean = alertHistoryEnabled
fun isAlertHistoryEnabled(dataSources: DataSources): Boolean {
if (dataSources.alertsIndex == ALERT_INDEX) {
return alertHistoryEnabled
}
return false
}

fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled

Expand All @@ -243,7 +249,23 @@ class AlertIndices(
}
alertIndexInitialized
}
suspend fun createOrUpdateAlertIndex(dataSources: DataSources) {
if (dataSources.alertsIndex == ALERT_INDEX) {
return createOrUpdateAlertIndex()
}
val alertsIndex = dataSources.alertsIndex
if (!clusterService.state().routingTable().hasIndex(alertsIndex)) {
alertIndexInitialized = createIndex(alertsIndex!!, alertMapping())
} else {
updateIndexMapping(alertsIndex!!, alertMapping())
}
}

suspend fun createOrUpdateInitialAlertHistoryIndex(dataSources: DataSources) {
if (dataSources.alertsIndex == ALERT_INDEX) {
return createOrUpdateInitialAlertHistoryIndex()
}
}
suspend fun createOrUpdateInitialAlertHistoryIndex() {
if (!alertHistoryIndexInitialized) {
alertHistoryIndexInitialized = createIndex(ALERT_HISTORY_INDEX_PATTERN, alertMapping(), ALERT_HISTORY_WRITE_INDEX)
Expand Down Expand Up @@ -273,6 +295,21 @@ class AlertIndices(
findingHistoryIndexInitialized
}

suspend fun createOrUpdateInitialFindingHistoryIndex(dataSources: DataSources) {
if (dataSources.findingsIndex == FINDING_HISTORY_WRITE_INDEX) {
return createOrUpdateInitialFindingHistoryIndex()
}
val findingsIndex = dataSources.findingsIndex
if (!clusterService.state().routingTable().hasIndex(findingsIndex)) {
createIndex(
findingsIndex,
findingMapping()
)
} else {
updateIndexMapping(findingsIndex, findingMapping(), false)
}
}

private suspend fun createIndex(index: String, schemaMapping: String, alias: String? = null): Boolean {
// This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster
// state does not contain the index and multiple nodes concurrently try to create the index.
Expand Down Expand Up @@ -306,7 +343,7 @@ class AlertIndices(
return
}

var putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex)
val putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex)
.source(mapping, XContentType.JSON)
val updateResponse: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingRequest, it) }
if (updateResponse.isAcknowledged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting.resthandler
import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
Expand All @@ -21,6 +22,7 @@ import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
Expand Down Expand Up @@ -82,6 +84,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
validateDataSources(monitor)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Expand Down Expand Up @@ -121,6 +124,18 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}

private fun validateDataSources(monitor: Monitor) { // Data Sources will currently be supported only at transport layer.
if (monitor.dataSources != null) {
if (
monitor.dataSources.queryIndex != ScheduledJob.DOC_LEVEL_QUERIES_INDEX ||
monitor.dataSources.findingsIndex != AlertIndices.FINDING_HISTORY_WRITE_INDEX ||
monitor.dataSources.alertsIndex != AlertIndices.ALERT_INDEX
) {
throw IllegalArgumentException("Custom Data Sources are not allowed.")
}
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method):
RestResponseListener<IndexMonitorResponse> {
return object : RestResponseListener<IndexMonitorResponse>(channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class TransportExecuteMonitorAction @Inject constructor(
try {
scope.launch {
if (!docLevelMonitorQueries.docLevelQueryIndexExists()) {
docLevelMonitorQueries.initDocLevelQueryIndex()
docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources)
log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created")
}
docLevelMonitorQueries.indexDocLevelQueries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,18 @@ class TransportIndexMonitorAction @Inject constructor(

@Suppress("UNCHECKED_CAST")
private suspend fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String, refreshPolicy: RefreshPolicy) {
if (!docLevelMonitorQueries.docLevelQueryIndexExists()) {
docLevelMonitorQueries.initDocLevelQueryIndex()
log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created")
val queryIndex = monitor.dataSources.queryIndex
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources)
log.info("Central Percolation index $queryIndex created")
}
docLevelMonitorQueries.indexDocLevelQueries(
monitor,
monitorId,
refreshPolicy,
indexTimeout
)
log.debug("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}")
log.debug("Queries inserted into Percolate index $queryIndex")
}

private suspend fun updateMonitor() {
Expand Down
Loading

0 comments on commit 7e812ab

Please sign in to comment.