Skip to content

Commit a549a9d

Browse files
committed
enhance monitor to store only relevant fields for docs linked to findings
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com> fix it Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
1 parent 09f4cd6 commit a549a9d

File tree

5 files changed

+82
-11
lines changed

5 files changed

+82
-11
lines changed

src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class Finding(
2727
* Keeps the track of the workflow-monitor exact execution.
2828
* Used for filtering the data when chaining monitors in a workflow.
2929
*/
30-
val executionId: String? = null
30+
val executionId: String? = null,
31+
val additionalFields: Map<String, Any>? = mapOf()
3132
) : Writeable, ToXContent {
3233

3334
constructor(
@@ -46,7 +47,8 @@ class Finding(
4647
index = index,
4748
docLevelQueries = docLevelQueries,
4849
timestamp = timestamp,
49-
executionId = null
50+
executionId = null,
51+
additionalFields = null
5052
)
5153

5254
@Throws(IOException::class)
@@ -59,7 +61,8 @@ class Finding(
5961
index = sin.readString(),
6062
docLevelQueries = sin.readList((DocLevelQuery)::readFrom),
6163
timestamp = sin.readInstant(),
62-
executionId = sin.readOptionalString()
64+
executionId = sin.readOptionalString(),
65+
additionalFields = sin.readMap()
6366
)
6467

6568
fun asTemplateArg(): Map<String, Any?> {
@@ -87,6 +90,7 @@ class Finding(
8790
.field(QUERIES_FIELD, docLevelQueries.toTypedArray())
8891
.field(TIMESTAMP_FIELD, timestamp.toEpochMilli())
8992
.field(EXECUTION_ID_FIELD, executionId)
93+
.field(ADDITIONAL_FIELDS_FIELD, additionalFields)
9094
builder.endObject()
9195
return builder
9296
}
@@ -102,6 +106,7 @@ class Finding(
102106
out.writeCollection(docLevelQueries)
103107
out.writeInstant(timestamp)
104108
out.writeOptionalString(executionId)
109+
out.writeMap(additionalFields)
105110
}
106111

107112
companion object {
@@ -115,6 +120,7 @@ class Finding(
115120
const val TIMESTAMP_FIELD = "timestamp"
116121
const val EXECUTION_ID_FIELD = "execution_id"
117122
const val NO_ID = ""
123+
const val ADDITIONAL_FIELDS_FIELD = "additional_fields"
118124

119125
@JvmStatic
120126
@Throws(IOException::class)
@@ -128,6 +134,7 @@ class Finding(
128134
val queries: MutableList<DocLevelQuery> = mutableListOf()
129135
lateinit var timestamp: Instant
130136
var executionId: String? = null
137+
var additionalFields: Map<String, Any>? = null
131138

132139
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
133140
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -161,6 +168,7 @@ class Finding(
161168
timestamp = requireNotNull(xcp.instant())
162169
}
163170
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
171+
ADDITIONAL_FIELDS_FIELD -> additionalFields = xcp.map()
164172
}
165173
}
166174

@@ -173,7 +181,8 @@ class Finding(
173181
index = index,
174182
docLevelQueries = queries,
175183
timestamp = timestamp,
176-
executionId = executionId
184+
executionId = executionId,
185+
additionalFields = additionalFields
177186
)
178187
}
179188

src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ data class Monitor(
4545
val dataSources: DataSources = DataSources(),
4646
val deleteQueryIndexInEveryRun: Boolean? = false,
4747
val shouldCreateSingleAlertForFindings: Boolean? = false,
48-
val owner: String? = "alerting"
48+
val owner: String? = "alerting",
49+
val metadataForFindings: List<String>? = listOf()
4950
) : ScheduledJob {
5051

5152
override val type = MONITOR_TYPE
@@ -123,7 +124,8 @@ data class Monitor(
123124
} else {
124125
false
125126
},
126-
owner = sin.readOptionalString()
127+
owner = sin.readOptionalString(),
128+
metadataForFindings = sin.readOptionalStringList()
127129
)
128130

129131
// This enum classifies different Monitors
@@ -185,6 +187,7 @@ data class Monitor(
185187
builder.field(DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD, deleteQueryIndexInEveryRun)
186188
builder.field(SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD, shouldCreateSingleAlertForFindings)
187189
builder.field(OWNER_FIELD, owner)
190+
builder.field(METADATA_FOR_FINDINGS_FIELD, metadataForFindings)
188191
if (params.paramAsBoolean("with_type", false)) builder.endObject()
189192
return builder.endObject()
190193
}
@@ -242,6 +245,7 @@ data class Monitor(
242245
out.writeOptionalBoolean(shouldCreateSingleAlertForFindings)
243246
}
244247
out.writeOptionalString(owner)
248+
out.writeOptionalStringCollection(metadataForFindings)
245249
}
246250

247251
companion object {
@@ -264,6 +268,7 @@ data class Monitor(
264268
const val DELETE_QUERY_INDEX_IN_EVERY_RUN_FIELD = "delete_query_index_in_every_run"
265269
const val SHOULD_CREATE_SINGLE_ALERT_FOR_FINDINGS_FIELD = "should_create_single_alert_for_findings"
266270
const val OWNER_FIELD = "owner"
271+
const val METADATA_FOR_FINDINGS_FIELD = "metadata_for_findings"
267272
val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}")
268273

269274
// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
@@ -294,6 +299,7 @@ data class Monitor(
294299
var deleteQueryIndexInEveryRun = false
295300
var delegateMonitor = false
296301
var owner = "alerting"
302+
var metadataForFindings: MutableList<String> = mutableListOf()
297303

298304
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
299305
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -357,6 +363,17 @@ data class Monitor(
357363
xcp.booleanValue()
358364
}
359365
OWNER_FIELD -> owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
366+
METADATA_FOR_FINDINGS_FIELD -> {
367+
XContentParserUtils.ensureExpectedToken(
368+
XContentParser.Token.START_ARRAY,
369+
xcp.currentToken(),
370+
xcp
371+
)
372+
373+
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
374+
metadataForFindings.add(xcp.text())
375+
}
376+
}
360377
else -> {
361378
xcp.skipChildren()
362379
}
@@ -385,7 +402,8 @@ data class Monitor(
385402
dataSources,
386403
deleteQueryIndexInEveryRun,
387404
delegateMonitor,
388-
owner
405+
owner,
406+
metadataForFindings
389407
)
390408
}
391409

src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,20 @@ internal class GetFindingsResponseTests {
2424
"monitor_name1",
2525
"test_index1",
2626
listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())),
27-
Instant.now()
27+
Instant.now(),
28+
additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value"))
2829
)
2930
val findingDocument1 = FindingDocument("test_index1", "doc1", true, "document 1 payload")
3031
val findingDocument2 = FindingDocument("test_index1", "doc2", true, "document 2 payload")
3132
val findingDocument3 = FindingDocument("test_index1", "doc3", true, "document 3 payload")
33+
val findingDocument4 = FindingDocument(
34+
"test_index1",
35+
"doc4",
36+
true,
37+
"document 4 payload"
38+
)
3239

33-
val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3))
40+
val findingWithDocs1 = FindingWithDocs(finding1, listOf(findingDocument1, findingDocument2, findingDocument3, findingDocument4))
3441

3542
// Alerting GetFindingsResponse mock #2
3643

@@ -43,12 +50,19 @@ internal class GetFindingsResponseTests {
4350
"monitor_name2",
4451
"test_index2",
4552
listOf(DocLevelQuery("1", "myQuery", listOf(), "fieldA:valABC", listOf())),
46-
Instant.now()
53+
Instant.now(),
54+
additionalFields = mapOf(Pair("field1", 1), Pair("field2", "value"))
4755
)
4856
val findingDocument21 = FindingDocument("test_index2", "doc21", true, "document 21 payload")
4957
val findingDocument22 = FindingDocument("test_index2", "doc22", true, "document 22 payload")
58+
val findingDocument24 = FindingDocument(
59+
"test_index2",
60+
"doc22",
61+
true,
62+
"document 22 payload"
63+
)
5064

51-
val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22))
65+
val findingWithDocs2 = FindingWithDocs(finding2, listOf(findingDocument21, findingDocument22, findingDocument24))
5266

5367
val req = GetFindingsResponse(RestStatus.OK, 2, listOf(findingWithDocs1, findingWithDocs2))
5468
Assertions.assertNotNull(req)

src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.opensearch.commons.alerting.randomBucketLevelTrigger
1818
import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult
1919
import org.opensearch.commons.alerting.randomChainedAlertTrigger
2020
import org.opensearch.commons.alerting.randomDocLevelQuery
21+
import org.opensearch.commons.alerting.randomDocumentLevelMonitor
2122
import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult
2223
import org.opensearch.commons.alerting.randomDocumentLevelTrigger
2324
import org.opensearch.commons.alerting.randomInputRunResults
@@ -113,6 +114,20 @@ class WriteableTests {
113114
Assertions.assertEquals(newWorkflow, workflow, "Round tripping Workflow failed")
114115
}
115116

117+
@Test
118+
fun `test query-level monitor with metadata for findings as stream`() {
119+
val monitor = randomDocumentLevelMonitor().copy(
120+
inputs = listOf(DocLevelMonitorInput(indices = listOf("<test-{now/d}>"), queries = emptyList())),
121+
triggers = emptyList(),
122+
metadataForFindings = listOf("field1", "field2")
123+
)
124+
val out = BytesStreamOutput()
125+
monitor.writeTo(out)
126+
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
127+
val newMonitor = Monitor(sin)
128+
Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work")
129+
}
130+
116131
@Test
117132
fun `test query-level trigger as stream`() {
118133
val trigger = randomQueryLevelTrigger()

src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.randomAlert
2323
import org.opensearch.commons.alerting.randomBucketLevelMonitor
2424
import org.opensearch.commons.alerting.randomBucketLevelTrigger
2525
import org.opensearch.commons.alerting.randomDocLevelQuery
26+
import org.opensearch.commons.alerting.randomDocumentLevelMonitor
2627
import org.opensearch.commons.alerting.randomQueryLevelMonitor
2728
import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser
2829
import org.opensearch.commons.alerting.randomQueryLevelTrigger
@@ -127,6 +128,7 @@ class XContentTests {
127128
}
128129
}
129130

131+
@Test
130132
fun `test query-level monitor parsing`() {
131133
val monitor = randomQueryLevelMonitor()
132134

@@ -135,6 +137,19 @@ class XContentTests {
135137
assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor)
136138
}
137139

140+
@Test
141+
fun `test doc-level monitor parsing`() {
142+
val monitor = randomDocumentLevelMonitor().copy(
143+
inputs = listOf(DocLevelMonitorInput(indices = listOf("<test-{now/d}>"), queries = emptyList())),
144+
triggers = emptyList(),
145+
metadataForFindings = listOf("field1", "field2")
146+
)
147+
148+
val monitorString = monitor.toJsonStringWithUser()
149+
val parsedMonitor = Monitor.parse(parser(monitorString))
150+
assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor)
151+
}
152+
138153
@Test
139154
fun `test monitor parsing with no name`() {
140155
val monitorStringWithoutName = """

0 commit comments

Comments
 (0)