Skip to content

Commit 83c0faa

Browse files
stevanbzstevanbuzejic
authored andcommitted
Added function for request recreation that considers the writeable re… (#303)
* Added function for request recreation that considers the writeable registry used for parsing the aggregations Co-authored-by: Stevan Buzejic <stevan.buzejic@htecgroup.com> (cherry picked from commit 6a169f7)
1 parent 5904e9c commit 83c0faa

File tree

4 files changed

+92
-3
lines changed

4 files changed

+92
-3
lines changed

src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.commons.alerting
77
import org.opensearch.action.ActionListener
88
import org.opensearch.action.ActionResponse
99
import org.opensearch.client.node.NodeClient
10+
import org.opensearch.common.io.stream.NamedWriteableRegistry
1011
import org.opensearch.common.io.stream.Writeable
1112
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
1213
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
@@ -31,26 +32,27 @@ object AlertingPluginInterface {
3132
* Index monitor interface.
3233
* @param client Node client for making transport action
3334
* @param request The request object
35+
* @param namedWriteableRegistry Registry for building aggregations
3436
* @param listener The listener for getting response
3537
*/
3638
fun indexMonitor(
3739
client: NodeClient,
3840
request: IndexMonitorRequest,
41+
namedWriteableRegistry: NamedWriteableRegistry,
3942
listener: ActionListener<IndexMonitorResponse>
4043
) {
4144
client.execute(
4245
AlertingActions.INDEX_MONITOR_ACTION_TYPE,
4346
request,
4447
wrapActionListener(listener) { response ->
45-
recreateObject(response) {
48+
recreateObject(response, namedWriteableRegistry) {
4649
IndexMonitorResponse(
4750
it
4851
)
4952
}
5053
}
5154
)
5255
}
53-
5456
fun deleteMonitor(
5557
client: NodeClient,
5658
request: DeleteMonitorRequest,

src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.opensearch.commons.utils
77

88
import org.opensearch.common.io.stream.InputStreamStreamInput
9+
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
10+
import org.opensearch.common.io.stream.NamedWriteableRegistry
911
import org.opensearch.common.io.stream.OutputStreamStreamOutput
1012
import org.opensearch.common.io.stream.StreamInput
1113
import org.opensearch.common.io.stream.StreamOutput
@@ -36,3 +38,19 @@ inline fun <reified Request> recreateObject(writeable: Writeable, block: (Stream
3638
}
3739
}
3840
}
41+
42+
/**
43+
* Re create the object from the writeable. Uses NamedWriteableRegistry in order to build the aggregations.
44+
* This method needs to be inline and reified so that when this is called from
45+
* doExecute() of transport action, the object may be created from other JVM.
46+
*/
47+
inline fun <reified Request> recreateObject(writeable: Writeable, namedWriteableRegistry: NamedWriteableRegistry, block: (StreamInput) -> Request): Request {
48+
ByteArrayOutputStream().use { byteArrayOutputStream ->
49+
OutputStreamStreamOutput(byteArrayOutputStream).use {
50+
writeable.writeTo(it)
51+
InputStreamStreamInput(ByteArrayInputStream(byteArrayOutputStream.toByteArray())).use { streamInput ->
52+
return block(NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry))
53+
}
54+
}
55+
}
56+
}

src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import org.mockito.junit.jupiter.MockitoExtension
1212
import org.opensearch.action.ActionListener
1313
import org.opensearch.action.ActionType
1414
import org.opensearch.client.node.NodeClient
15+
import org.opensearch.common.io.stream.NamedWriteableRegistry
16+
import org.opensearch.common.settings.Settings
1517
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
1618
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
1719
import org.opensearch.commons.alerting.action.GetAlertsRequest
@@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.FindingWithDocs
2527
import org.opensearch.commons.alerting.model.Monitor
2628
import org.opensearch.index.seqno.SequenceNumbers
2729
import org.opensearch.rest.RestStatus
30+
import org.opensearch.search.SearchModule
2831

2932
@Suppress("UNCHECKED_CAST")
3033
@ExtendWith(MockitoExtension::class)
@@ -41,13 +44,33 @@ internal class AlertingPluginInterfaceTests {
4144
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
4245
val listener: ActionListener<IndexMonitorResponse> =
4346
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
47+
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
4448

4549
Mockito.doAnswer {
4650
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
4751
.onResponse(response)
4852
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
4953

50-
AlertingPluginInterface.indexMonitor(client, request, listener)
54+
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
55+
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
56+
}
57+
58+
@Test
59+
fun indexBucketMonitor() {
60+
val monitor = randomBucketLevelMonitor()
61+
62+
val request = mock(IndexMonitorRequest::class.java)
63+
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
64+
val listener: ActionListener<IndexMonitorResponse> =
65+
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
66+
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
67+
68+
Mockito.doAnswer {
69+
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
70+
.onResponse(response)
71+
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
72+
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
73+
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
5174
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
5275
}
5376

src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorRequestTests.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import org.junit.jupiter.api.Assertions
44
import org.junit.jupiter.api.Test
55
import org.opensearch.action.support.WriteRequest
66
import org.opensearch.common.io.stream.BytesStreamOutput
7+
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
8+
import org.opensearch.common.io.stream.NamedWriteableRegistry
79
import org.opensearch.common.io.stream.StreamInput
10+
import org.opensearch.common.settings.Settings
811
import org.opensearch.commons.alerting.model.SearchInput
12+
import org.opensearch.commons.alerting.randomBucketLevelMonitor
913
import org.opensearch.commons.alerting.randomQueryLevelMonitor
14+
import org.opensearch.commons.utils.recreateObject
1015
import org.opensearch.rest.RestRequest
16+
import org.opensearch.search.SearchModule
1117
import org.opensearch.search.builder.SearchSourceBuilder
1218

1319
class IndexMonitorRequestTests {
@@ -32,6 +38,46 @@ class IndexMonitorRequestTests {
3238
Assertions.assertNotNull(newReq.monitor)
3339
}
3440

41+
@Test
42+
fun `test index bucket monitor post request`() {
43+
val req = IndexMonitorRequest(
44+
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
45+
randomBucketLevelMonitor()
46+
)
47+
Assertions.assertNotNull(req)
48+
49+
val out = BytesStreamOutput()
50+
req.writeTo(out)
51+
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
52+
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
53+
val newReq = IndexMonitorRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry))
54+
Assertions.assertEquals("1234", newReq.monitorId)
55+
Assertions.assertEquals(1L, newReq.seqNo)
56+
Assertions.assertEquals(2L, newReq.primaryTerm)
57+
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
58+
Assertions.assertNotNull(newReq.monitor)
59+
}
60+
61+
@Test
62+
fun `Index bucket monitor serialize and deserialize transport object should be equal`() {
63+
val bucketLevelMonitorRequest = IndexMonitorRequest(
64+
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
65+
randomBucketLevelMonitor()
66+
)
67+
68+
Assertions.assertThrows(UnsupportedOperationException::class.java) {
69+
recreateObject(bucketLevelMonitorRequest) { IndexMonitorRequest(it) }
70+
}
71+
72+
val recreatedObject = recreateObject(bucketLevelMonitorRequest, NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)) { IndexMonitorRequest(it) }
73+
Assertions.assertEquals(bucketLevelMonitorRequest.monitorId, recreatedObject.monitorId)
74+
Assertions.assertEquals(bucketLevelMonitorRequest.seqNo, recreatedObject.seqNo)
75+
Assertions.assertEquals(bucketLevelMonitorRequest.primaryTerm, recreatedObject.primaryTerm)
76+
Assertions.assertEquals(bucketLevelMonitorRequest.method, recreatedObject.method)
77+
Assertions.assertNotNull(recreatedObject.monitor)
78+
Assertions.assertEquals(bucketLevelMonitorRequest.monitor, recreatedObject.monitor)
79+
}
80+
3581
@Test
3682
fun `test index monitor put request`() {
3783

0 commit comments

Comments
 (0)