Skip to content

Commit

Permalink
Adds support for using findings from list of monitors as input data f…
Browse files Browse the repository at this point in the history
…or a monitor in workflow (#1112)

* support using findings from list ofmonitors as input data for a monitor in workflow

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* update exception message assertion in test

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix code comments in tests and source code

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix grammar in code comment

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep authored Sep 2, 2023
1 parent 0f2dec7 commit 7b9584c
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,26 @@ class WorkflowService(
* Returns finding doc ids per index for the given workflow execution
* Used for pre-filtering the dataset in the case of creating a workflow with chained findings
*
* @param chainedMonitor Monitor that is previously executed
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(chainedMonitor.dataSources.findingsIndex).local(true), it)
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
// Search findings index per monitor and workflow execution id
val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, chainedMonitor.id))
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
QueryBuilders.termsQuery(
Finding.MONITOR_ID_FIELD,
chainedMonitors.map { it.id }
)
)
.filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId))
val searchRequest = SearchRequest()
.source(
Expand All @@ -57,7 +66,7 @@ class WorkflowService(
.version(true)
.seqNoAndPrimaryTerm(true)
)
.indices(chainedMonitor.dataSources.findingsIndex)
.indices(dataSources.findingsIndex)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }

// Get the findings docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,27 +528,39 @@ class TransportIndexWorkflowAction @Inject constructor(

val monitorsById = monitorDelegates.associateBy { it.id }
delegates.forEach {

val delegateMonitor = monitorsById[it.monitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Delegate monitor ${it.monitorId} doesn't exist")
)
if (it.chainedMonitorFindings != null) {
val chainedFindingMonitor =
monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Chained finding monitor doesn't exist")
)

if (chainedFindingMonitor.isQueryLevelMonitor()) {
throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings"))
val chainedMonitorIds: MutableList<String> = mutableListOf()
if (it.chainedMonitorFindings!!.monitorId.isNullOrBlank()) {
chainedMonitorIds.addAll(it.chainedMonitorFindings!!.monitorIds)
} else {
chainedMonitorIds.add(it.chainedMonitorFindings!!.monitorId!!)
}
chainedMonitorIds.forEach { chainedMonitorId ->
val chainedFindingMonitor =
monitorsById[chainedMonitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Chained finding monitor $chainedMonitorId doesn't exist")
)

if (chainedFindingMonitor.isQueryLevelMonitor()) {
throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings"))
}

val delegateMonitorIndices = getMonitorIndices(delegateMonitor)
val delegateMonitorIndices = getMonitorIndices(delegateMonitor)

val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor)
val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor)

if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) {
throw AlertingException.wrap(
IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices")
)
if (!delegateMonitorIndices.containsAll(chainedMonitorIndices)) {
throw AlertingException.wrap(
IllegalArgumentException(
"Delegate monitor indices ${delegateMonitorIndices.joinToString()} " +
"doesn't query all of chained findings monitor's indices ${chainedMonitorIndices.joinToString()}}"
)
)
}
}
}
}
Expand Down Expand Up @@ -581,7 +593,7 @@ class TransportIndexWorkflowAction @Inject constructor(

private fun validateDelegateMonitorsExist(
monitorIds: List<String>,
delegateMonitors: List<Monitor>
delegateMonitors: List<Monitor>,
) {
val reqMonitorIds: MutableList<String> = monitorIds as MutableList<String>
delegateMonitors.forEach {
Expand All @@ -601,7 +613,7 @@ class TransportIndexWorkflowAction @Inject constructor(
request: IndexWorkflowRequest,
user: User?,
client: Client,
actionListener: ActionListener<AcknowledgedResponse>
actionListener: ActionListener<AcknowledgedResponse>,
) {
val compositeInput = request.workflow.inputs[0] as CompositeInput
val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,23 @@ object CompositeWorkflowRunner : WorkflowRunner() {
IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
if (delegate.chainedMonitorFindings != null) {
val chainedMonitor = monitorsById[delegate.chainedMonitorFindings!!.monitorId]
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
val chainedMonitorIds: MutableList<String> = mutableListOf()
if (delegate.chainedMonitorFindings!!.monitorId.isNullOrBlank()) {
chainedMonitorIds.addAll(delegate.chainedMonitorFindings!!.monitorIds)
} else {
chainedMonitorIds.add(delegate.chainedMonitorFindings!!.monitorId!!)
}
val chainedMonitors = mutableListOf<Monitor>()
chainedMonitorIds.forEach {
val chainedMonitor = monitorsById[it]
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
chainedMonitors.add(chainedMonitor)
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId)
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
Expand Down Expand Up @@ -2698,6 +2700,121 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2"))
}

fun `test execute workflow with multiple monitors in chained monitor findings of single monitor`() {
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex1 = "custom_alerts_index"
val customFindingsIndex1 = "custom_findings_index"
val customFindingsIndexPattern1 = "custom_findings_index-1"
var monitor1 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput1),
triggers = listOf(trigger1),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)
val monitorResponse = createMonitor(monitor1)!!

val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor2 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput2),
triggers = listOf(trigger2),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)

val monitorResponse2 = createMonitor(monitor2)!!
val docQuery3 = DocLevelQuery(query = "_id:*", name = "5")
val docLevelInput3 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery3))
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

var monitor3 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput3),
triggers = listOf(trigger3),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)

val monitorResponse3 = createMonitor(monitor3)!!
val d1 = Delegate(1, monitorResponse.id)
val d2 = Delegate(2, monitorResponse2.id)
val d3 = Delegate(
3, monitorResponse3.id,
ChainedMonitorFindings(null, listOf(monitorResponse.id, monitorResponse2.id))
)
var workflow = Workflow(
id = "",
name = "test",
enabled = false,
schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
lastUpdateTime = Instant.now(),
enabledTime = null,
workflowType = Workflow.WorkflowType.COMPOSITE,
user = randomUser(),
inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(d1, d2, d3)))),
version = -1L,
schemaVersion = 0,
triggers = emptyList(),
auditDelegateMonitorAlerts = false

)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1
val testDoc1 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16644,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "1", testDoc1)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1 and monitor2
val testDoc2 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "2", testDoc2)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1 and monitor2
val testDoc3 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-east-1"
}"""
indexDoc(index, "3", testDoc3)

val workflowId = workflowResponse.id
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
val monitorsRunResults = executeWorkflowResponse.workflowRunResult.monitorRunResults
assertEquals(3, monitorsRunResults.size)
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2"))
assertFindings(monitorResponse2.id, customFindingsIndex1, 2, 2, listOf("2", "3"))
assertFindings(monitorResponse3.id, customFindingsIndex1, 3, 3, listOf("1", "2", "3"))
}

fun `test execute workflows with shared doc level monitor delegate`() {
val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down Expand Up @@ -5307,7 +5424,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
e.message?.let {
assertTrue(
"Exception not returning IndexWorkflow Action error ",
it.contains("Delegate monitor and it's chained finding monitor must query the same indices")
it.contains("doesn't query all of chained findings monitor's indices")
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
e.message?.let {
assertTrue(
"Exception not returning IndexWorkflow Action error ",
it.contains("Delegate monitor and it's chained finding monitor must query the same indices")
it.contains("doesn't query all of chained findings monitor's indices")
)
}
}
Expand Down

0 comments on commit 7b9584c

Please sign in to comment.