Skip to content

Commit

Permalink
draft refactor of REST handling and Transport, still work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
  • Loading branch information
toepkerd-zz committed Sep 2, 2022
1 parent a48d1f4 commit 050f42a
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetSuggestionsAction
import org.opensearch.alerting.action.IndexMonitorAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
Expand Down Expand Up @@ -47,6 +48,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetSuggestionsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
Expand All @@ -65,6 +67,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetSuggestionsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
Expand Down Expand Up @@ -122,6 +125,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val SUGGESTIONS_BASE_URI = "/_plugins/suggestions/alerting"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"
Expand Down Expand Up @@ -165,7 +169,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetFindingsAction()
RestGetFindingsAction(),
RestGetSuggestionsAction()
)
}

Expand All @@ -184,8 +189,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(GetFindingsAction.INSTANCE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(GetFindingsAction.INSTANCE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(GetSuggestionsAction.INSTANCE, TransportGetSuggestionsAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.rules.inputs.util.SuggestionInput
import org.opensearch.alerting.rules.inputs.util.SuggestionInputFactory
import org.opensearch.alerting.rules.inputs.util.SuggestionInputType
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.XContentParser
import java.io.IOException

class GetSuggestionsRequest : ActionRequest {
val monitorId: String?
val monitor: Monitor?
val inputType: SuggestionInputType
val component: String
val input: SuggestionInput<*, Any> // TODO: is * safe here?

constructor(
monitorId: String?,
monitor: Monitor?
inputType: SuggestionInputType,
component: String,
xcp: XContentParser
) : super() {
this.monitorId = monitorId
this.monitor = monitor
this.inputType = inputType
this.component = component
this.input = SuggestionInputFactory.getInput(this.inputType, xcp)
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readOptionalString(), // monitorId
if (sin.readBoolean()) {
Monitor.readFrom(sin) // monitor
} else null
)

override fun validate(): ActionRequestValidationException? {
return null
constructor(sin: StreamInput) : super() {
this.inputType = sin.readEnum(SuggestionInputType::class.java) // inputType
this.component = sin.readString() // component
this.input = SuggestionInputFactory.getInput(this.inputType, sin)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(monitorId)
if (monitor != null) {
out.writeBoolean(true)
monitor.writeTo(out)
} else {
out.writeBoolean(false)
}
out.writeEnum(inputType)
out.writeString(component)
input.writeTo(out)
}

override fun validate(): ActionRequestValidationException? {
return null
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionResponse
Expand All @@ -12,7 +17,6 @@ import java.io.IOException
class GetSuggestionsResponse : ActionResponse, ToXContentObject {
var suggestions: List<String>
var status: RestStatus
// TODO: do we need id, primaryTerm, seqNo, etc, what are they used for?

constructor(
suggestions: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ data class Monitor(

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): Monitor? {
fun readFrom(sin: StreamInput): Monitor {
return Monitor(sin)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.action.GetSuggestionsAction
import org.opensearch.alerting.action.GetSuggestionsRequest
import org.opensearch.alerting.rules.inputs.util.SuggestionInput
import org.opensearch.alerting.rules.inputs.util.SuggestionInputType
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.Route
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestRequest.Method.POST
import org.opensearch.rest.action.RestToXContentListener

private val log = LogManager.getLogger(RestGetSuggestionsAction::class.java)

class RestGetSuggestionsAction : BaseRestHandler() {

override fun getName(): String = "get_suggestions_action"

override fun routes(): List<Route> {
return listOf(
Route(POST, AlertingPlugin.SUGGESTIONS_BASE_URI) // inline object with specific format is required
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.SUGGESTIONS_BASE_URI}")

return RestChannelConsumer { channel ->
var inputType: SuggestionInputType? = null
var component: String? = null
var hasInput = false

val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
SuggestionInput.INPUT_TYPE_FIELD -> inputType = SuggestionInputType.enumFromStr(xcp.text())
SuggestionInput.COMPONENT_FIELD -> component = xcp.text()
SuggestionInput.INPUT_FIELD -> {
hasInput = true
break
}
else -> throw IllegalArgumentException("request body must contain only input, inputType, and component fields")
}
}

if (inputType == null || component == null || !hasInput) {
throw IllegalArgumentException("request body must contain input, inputType, and component fields")
}

val getSuggestionsRequest = GetSuggestionsRequest(inputType, component, xcp) // xcp already pointing to beginning of input{} object

client.execute(GetSuggestionsAction.INSTANCE, getSuggestionsRequest, RestToXContentListener(channel))
}
}

// private fun validateInputs(request: RestRequest) {
// if (!request.hasParam(SuggestionInput.INPUT_FIELD) || !request.hasParam(SuggestionInput.INPUT_TYPE_FIELD) || !request.hasParam(SuggestionInput.COMPONENT_FIELD)) {
// throw IllegalArgumentException(
// "request body must contain input, inputType, and component fields, ${request.hasParam(SuggestionInput.INPUT_FIELD)}," +
// "${request.hasParam(SuggestionInput.INPUT_TYPE_FIELD)}, ${request.hasParam(SuggestionInput.COMPONENT_FIELD)}"
// )
// }
//
// val inputType = request.param(SuggestionInput.INPUT_TYPE_FIELD)
// val allowedInputTypes = SuggestionInputType.values().map { it.value }
// if (!allowedInputTypes.contains(inputType)) {
// throw IllegalArgumentException("invalid input type, must be one of $allowedInputTypes")
// }
// }
//
// // prepare by making it point to the start of the "input{}" object rather
// // than the start of the entire request body
// private fun prepareXcp(xcp: XContentParser) {
// ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
// while (xcp.nextToken() != Token.END_OBJECT) {
// val fieldName = xcp.currentName()
// xcp.nextToken()
// if (fieldName == SuggestionInput.INPUT_FIELD) {
// break
// } else {
// xcp.skipChildren()
// }
// }
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.rules

import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.rules.util.Rule

object ExampleRule : Rule {
// dummy example rule that checks for wildcard expressions in index declarations
override fun evaluate(monitor: Monitor): Boolean {
val input = monitor.inputs[0]

if (input is SearchInput) {
for (index in input.indices) {
if (index.contains("*")) {
return false
}
}
}

return true
}

override fun suggestion(): String {
return "an index in your inputs uses a wildcard (*), consider explicitly declaring indices instead"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.rules.inputs

import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.rules.inputs.util.SuggestionInput
import org.opensearch.client.Client
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import org.opensearch.rest.RestStatus

class MonitorIDInput() : SuggestionInput<String, Monitor> {

override var rawInput: String? = null
var obj: Monitor? = null

constructor(sin: StreamInput) : this() {
rawInput = sin.readOptionalString() // TODO: readString() or readOptionalString()?
}

/**
* User input requirements that will be checked for:
* input{} must contain exactly one field named "monitorId"
*
* whether or not it stores a valid monitor id is deferred until
* the id is used to query the Scheduled Job Index for the Monitor
*
* Error is thrown if any of the above is violated
*/
override fun parseInput(xcp: XContentParser) {

// parse input for monitor id
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) // start of input {} block
ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) // name field, should be "monitorId"
if (xcp.currentName() != "monitorId") { // TODO: put String vals in constants in companion object
throw IllegalArgumentException("input must contain exactly one field named \"monitorId\" that stores a valid monitor id")
}
xcp.nextToken() // the value stored in the monitorId field, the monitor id itself
val monitorId: String = xcp.text() // TODO: setting to Monitor.NO_ID is redundant? consider doing it anyway, ie initialize at top and set later

this.rawInput = monitorId
}

override fun getObject(client: Client, xContentRegistry: NamedXContentRegistry): Monitor {
// check to ensure that parseInput was called first and rawInput is not null
if (rawInput == null) {
throw IllegalStateException("input was not parsed to get monitorId, parseInput() must be called first")
}

// use monitor id to retrieve Monitor object from Scheduled Jobs Index
var monitor: Monitor? = null

val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(this.rawInput)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
throw OpenSearchStatusException("Monitor with ID $rawInput not found", RestStatus.NOT_FOUND)
}

if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val receivedMonitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
monitor = receivedMonitor.copy()
}
}
}

override fun onFailure(e: Exception) {
throw IllegalStateException("onFailure: $e, $rawInput, $monitor")
}
}
)

if (monitor == null) {
throw IllegalStateException("if monitor not found, should have already failed and never gotten here")
}

return monitor as Monitor
}

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(rawInput)
}

companion object {
@JvmStatic
fun readFrom(sin: StreamInput): MonitorIDInput {
return MonitorIDInput(sin)
}
}
}
Loading

0 comments on commit 050f42a

Please sign in to comment.