Skip to content

Commit 81ddb4b

Browse files
authored
integrate security-analytics & alerting for correlation engine (#412)
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
1 parent 88c537d commit 81ddb4b

File tree

6 files changed

+116
-0
lines changed

6 files changed

+116
-0
lines changed

src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import org.opensearch.commons.alerting.action.GetFindingsRequest
2020
import org.opensearch.commons.alerting.action.GetFindingsResponse
2121
import org.opensearch.commons.alerting.action.IndexMonitorRequest
2222
import org.opensearch.commons.alerting.action.IndexMonitorResponse
23+
import org.opensearch.commons.alerting.action.PublishFindingsRequest
24+
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
2325
import org.opensearch.commons.notifications.action.BaseResponse
2426
import org.opensearch.commons.utils.recreateObject
2527

@@ -143,6 +145,24 @@ object AlertingPluginInterface {
143145
)
144146
}
145147

148+
fun publishFinding(
149+
client: NodeClient,
150+
request: PublishFindingsRequest,
151+
listener: ActionListener<SubscribeFindingsResponse>
152+
) {
153+
client.execute(
154+
AlertingActions.SUBSCRIBE_FINDINGS_ACTION_TYPE,
155+
request,
156+
wrapActionListener(listener) { response ->
157+
recreateObject(response) {
158+
SubscribeFindingsResponse(
159+
it
160+
)
161+
}
162+
}
163+
)
164+
}
165+
146166
@Suppress("UNCHECKED_CAST")
147167
private fun <Response : BaseResponse> wrapActionListener(
148168
listener: ActionListener<Response>,

src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ object AlertingActions {
1212
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
1313
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
1414
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
15+
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
1516

1617
@JvmField
1718
val INDEX_MONITOR_ACTION_TYPE =
@@ -28,4 +29,7 @@ object AlertingActions {
2829
@JvmField
2930
val ACKNOWLEDGE_ALERTS_ACTION_TYPE =
3031
ActionType(ACKNOWLEDGE_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)
32+
@JvmField
33+
val SUBSCRIBE_FINDINGS_ACTION_TYPE =
34+
ActionType(SUBSCRIBE_FINDINGS_ACTION_NAME, ::SubscribeFindingsResponse)
3135
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.opensearch.commons.alerting.action
2+
3+
import org.opensearch.action.ActionRequest
4+
import org.opensearch.action.ActionRequestValidationException
5+
import org.opensearch.common.io.stream.StreamInput
6+
import org.opensearch.common.io.stream.StreamOutput
7+
import org.opensearch.commons.alerting.model.Finding
8+
import java.io.IOException
9+
10+
class PublishFindingsRequest : ActionRequest {
11+
12+
val monitorId: String
13+
14+
val finding: Finding
15+
16+
constructor(
17+
monitorId: String,
18+
finding: Finding
19+
) : super() {
20+
this.monitorId = monitorId
21+
this.finding = finding
22+
}
23+
24+
@Throws(IOException::class)
25+
constructor(sin: StreamInput) : this(
26+
monitorId = sin.readString(),
27+
finding = Finding.readFrom(sin)
28+
)
29+
30+
override fun validate(): ActionRequestValidationException? {
31+
return null
32+
}
33+
34+
override fun writeTo(out: StreamOutput) {
35+
out.writeString(monitorId)
36+
finding.writeTo(out)
37+
}
38+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.opensearch.commons.alerting.action
2+
3+
import org.opensearch.common.io.stream.StreamInput
4+
import org.opensearch.common.io.stream.StreamOutput
5+
import org.opensearch.commons.notifications.action.BaseResponse
6+
import org.opensearch.core.xcontent.ToXContent
7+
import org.opensearch.core.xcontent.XContentBuilder
8+
import org.opensearch.rest.RestStatus
9+
import java.io.IOException
10+
11+
class SubscribeFindingsResponse : BaseResponse {
12+
13+
private var status: RestStatus
14+
15+
constructor(status: RestStatus) : super() {
16+
this.status = status
17+
}
18+
19+
@Throws(IOException::class)
20+
constructor(sin: StreamInput) {
21+
this.status = sin.readEnum(RestStatus::class.java)
22+
}
23+
24+
@Throws(IOException::class)
25+
override fun writeTo(out: StreamOutput) {
26+
out.writeEnum(status)
27+
}
28+
29+
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
30+
builder.startObject()
31+
.field("status", status.status)
32+
return builder.endObject()
33+
}
34+
35+
override fun getStatus(): RestStatus {
36+
return this.status
37+
}
38+
}

src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import java.time.Instant
1717
class Finding(
1818
val id: String = NO_ID,
1919
val relatedDocIds: List<String>,
20+
val correlatedDocIds: List<String> = listOf(),
2021
val monitorId: String,
2122
val monitorName: String,
2223
val index: String,
@@ -28,6 +29,7 @@ class Finding(
2829
constructor(sin: StreamInput) : this(
2930
id = sin.readString(),
3031
relatedDocIds = sin.readStringList(),
32+
correlatedDocIds = sin.readStringList(),
3133
monitorId = sin.readString(),
3234
monitorName = sin.readString(),
3335
index = sin.readString(),
@@ -39,6 +41,7 @@ class Finding(
3941
return mapOf(
4042
FINDING_ID_FIELD to id,
4143
RELATED_DOC_IDS_FIELD to relatedDocIds,
44+
CORRELATED_DOC_IDS_FIELD to correlatedDocIds,
4245
MONITOR_ID_FIELD to monitorId,
4346
MONITOR_NAME_FIELD to monitorName,
4447
INDEX_FIELD to index,
@@ -51,6 +54,7 @@ class Finding(
5154
builder.startObject()
5255
.field(FINDING_ID_FIELD, id)
5356
.field(RELATED_DOC_IDS_FIELD, relatedDocIds)
57+
.field(CORRELATED_DOC_IDS_FIELD, correlatedDocIds)
5458
.field(MONITOR_ID_FIELD, monitorId)
5559
.field(MONITOR_NAME_FIELD, monitorName)
5660
.field(INDEX_FIELD, index)
@@ -64,6 +68,7 @@ class Finding(
6468
override fun writeTo(out: StreamOutput) {
6569
out.writeString(id)
6670
out.writeStringCollection(relatedDocIds)
71+
out.writeStringCollection(correlatedDocIds)
6772
out.writeString(monitorId)
6873
out.writeString(monitorName)
6974
out.writeString(index)
@@ -74,6 +79,7 @@ class Finding(
7479
companion object {
7580
const val FINDING_ID_FIELD = "id"
7681
const val RELATED_DOC_IDS_FIELD = "related_doc_ids"
82+
const val CORRELATED_DOC_IDS_FIELD = "correlated_doc_ids"
7783
const val MONITOR_ID_FIELD = "monitor_id"
7884
const val MONITOR_NAME_FIELD = "monitor_name"
7985
const val INDEX_FIELD = "index"
@@ -86,6 +92,7 @@ class Finding(
8692
fun parse(xcp: XContentParser): Finding {
8793
var id: String = NO_ID
8894
val relatedDocIds: MutableList<String> = mutableListOf()
95+
val correlatedDocIds: MutableList<String> = mutableListOf()
8996
lateinit var monitorId: String
9097
lateinit var monitorName: String
9198
lateinit var index: String
@@ -105,6 +112,12 @@ class Finding(
105112
relatedDocIds.add(xcp.text())
106113
}
107114
}
115+
CORRELATED_DOC_IDS_FIELD -> {
116+
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
117+
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
118+
correlatedDocIds.add(xcp.text())
119+
}
120+
}
108121
MONITOR_ID_FIELD -> monitorId = xcp.text()
109122
MONITOR_NAME_FIELD -> monitorName = xcp.text()
110123
INDEX_FIELD -> index = xcp.text()
@@ -123,6 +136,7 @@ class Finding(
123136
return Finding(
124137
id = id,
125138
relatedDocIds = relatedDocIds,
139+
correlatedDocIds = correlatedDocIds,
126140
monitorId = monitorId,
127141
monitorName = monitorName,
128142
index = index,

src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal class GetFindingsResponseTests {
2121
val finding1 = Finding(
2222
"1",
2323
listOf("doc1", "doc2", "doc3"),
24+
listOf("doc1", "doc2", "doc3"),
2425
"monitor_id1",
2526
"monitor_name1",
2627
"test_index1",
@@ -39,6 +40,7 @@ internal class GetFindingsResponseTests {
3940
val finding2 = Finding(
4041
"1",
4142
listOf("doc21", "doc22"),
43+
listOf("doc21", "doc22"),
4244
"monitor_id2",
4345
"monitor_name2",
4446
"test_index2",

0 commit comments

Comments
 (0)