Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
import org.opensearch.alerting.actionv2.GetAlertsV2Action
import org.opensearch.alerting.actionv2.GetMonitorV2Action
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.alertsv2.AlertV2Indices.Companion.ALL_ALERT_V2_INDEX_PATTERN
import org.opensearch.alerting.alertsv2.AlertV2Mover
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
import org.opensearch.alerting.core.JobSweeper
Expand All @@ -29,6 +33,7 @@ import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportActio
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.AlertingV2Settings
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.modelv2.MonitorV2
Expand Down Expand Up @@ -57,6 +62,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
Expand Down Expand Up @@ -93,6 +99,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
Expand Down Expand Up @@ -194,8 +201,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var alertV2Indices: AlertV2Indices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
lateinit var alertV2Mover: AlertV2Mover
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
Expand Down Expand Up @@ -239,6 +248,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestDeleteMonitorV2Action(),
RestGetMonitorV2Action(),
RestSearchMonitorV2Action(settings, clusterService),
RestGetAlertsV2Action(),
)
}

Expand Down Expand Up @@ -278,6 +288,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
)
}

Expand Down Expand Up @@ -314,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
alertV2Indices = AlertV2Indices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
val triggerService = TriggerService(scriptService)
runner = MonitorRunnerService
Expand All @@ -325,6 +337,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerAlertV2Indices(alertV2Indices)
.registerInputService(
InputService(
client,
Expand All @@ -351,6 +364,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
alertV2Mover = AlertV2Mover(environment.settings(), client, threadPool, clusterService, xContentRegistry)
this.threadPool = threadPool
this.clusterService = clusterService

Expand Down Expand Up @@ -378,6 +392,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertV2Mover,
lockService,
alertService,
triggerService
Expand Down Expand Up @@ -475,7 +490,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_V2_QUERY_RESULTS_MAX_SIZE,
AlertingSettings.ALERT_V2_PER_RESULT_TRIGGER_MAX_ALERTS,
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH,
AlertingV2Settings.ALERTING_V2_ENABLED
)
}

Expand All @@ -494,6 +510,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
SystemIndexDescriptor(ALL_ALERT_INDEX_PATTERN, "Alerting Plugin system index pattern"),
SystemIndexDescriptor(SCHEDULED_JOBS_INDEX, "Alerting Plugin Configuration index"),
SystemIndexDescriptor(ALL_COMMENTS_INDEX_PATTERN, "Alerting Comments system index pattern"),
SystemIndexDescriptor(ALL_ALERT_V2_INDEX_PATTERN, "Alerting V2 Alerts index pattern")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
Expand Down Expand Up @@ -35,6 +36,7 @@ data class MonitorRunnerExecutionContext(
var settings: Settings? = null,
var threadPool: ThreadPool? = null,
var alertIndices: AlertIndices? = null,
var alertV2Indices: AlertV2Indices? = null,
var inputService: InputService? = null,
var triggerService: TriggerService? = null,
var alertService: AlertService? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import org.opensearch.alerting.action.ExecuteWorkflowRequest
import org.opensearch.alerting.action.ExecuteWorkflowResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.alertsv2.AlertV2Mover.Companion.moveAlertV2s
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.modelv2.MonitorV2
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner
Expand Down Expand Up @@ -137,6 +140,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerAlertV2Indices(alertV2Indices: AlertV2Indices): MonitorRunnerService {
this.monitorCtx.alertV2Indices = alertV2Indices
return this
}

fun registerInputService(inputService: InputService): MonitorRunnerService {
this.monitorCtx.inputService = inputService
return this
Expand Down Expand Up @@ -316,6 +324,18 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
logger.error("Failed to move active alerts for monitor [${job.id}].", e)
}
}
} else if (job is MonitorV2) {
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
moveAlertV2s(job.id, job, monitorCtx)
}
}
} catch (e: Exception) {
logger.error("Failed to move active alertV2s for monitorV2 [${job.id}].", e)
}
}
} else {
throw IllegalArgumentException("Invalid job type")
}
Expand All @@ -339,6 +359,15 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
} catch (e: Exception) {
logger.error("Failed to move active alerts for monitor [$jobId].", e)
}
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
moveAlertV2s(jobId, null, monitorCtx)
}
}
} catch (e: Exception) {
logger.error("Failed to move active alertV2s for monitorV2 [$jobId].", e)
}
}
}

Expand Down Expand Up @@ -433,20 +462,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
): MonitorRunResult<*> {
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
// has not been updated.
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
updateAlertingConfigIndexSchema()

if (job is Workflow) {
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
Expand Down Expand Up @@ -582,4 +598,21 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

private fun updateAlertingConfigIndexSchema() {
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.actionv2

import org.opensearch.action.ActionType

class GetAlertsV2Action private constructor() : ActionType<GetAlertsV2Response>(NAME, ::GetAlertsV2Response) {
companion object {
val INSTANCE = GetAlertsV2Action()
const val NAME = "cluster:admin/opensearch/alerting/v2/alerts/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.actionv2

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.Table
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class GetAlertsV2Request : ActionRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serde unit tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding

val table: Table
val severityLevel: String
val monitorV2Ids: List<String>?

constructor(
table: Table,
severityLevel: String,
monitorV2Ids: List<String>? = null,
) : super() {
this.table = table
this.severityLevel = severityLevel
this.monitorV2Ids = monitorV2Ids
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
table = Table.readFrom(sin),
severityLevel = sin.readString(),
monitorV2Ids = sin.readOptionalStringList(),
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
table.writeTo(out)
out.writeString(severityLevel)
out.writeOptionalStringCollection(monitorV2Ids)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.actionv2

import org.opensearch.alerting.modelv2.AlertV2
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException
import java.util.Collections

class GetAlertsV2Response : BaseResponse {
val alertV2s: List<AlertV2>

// totalAlertV2s is not the same as the size of alertV2s because there can be 30 alerts from the request, but
// the request only asked for 5 alerts, so totalAlertV2s will be 30, but alertV2s will only contain 5 alerts
val totalAlertV2s: Int?

constructor(
alertV2s: List<AlertV2>,
totalAlertV2s: Int?
) : super() {
this.alertV2s = alertV2s
this.totalAlertV2s = totalAlertV2s
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
alertV2s = Collections.unmodifiableList(sin.readList(::AlertV2)),
totalAlertV2s = sin.readOptionalInt()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeCollection(alertV2s)
out.writeOptionalInt(totalAlertV2s)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("alerts_v2", alertV2s)
.field("total_alerts_v2", totalAlertV2s)

return builder.endObject()
}
}
Loading
Loading