Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.actionv2.DeleteMonitorV2Action
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Action
import org.opensearch.alerting.actionv2.GetAlertsV2Action
import org.opensearch.alerting.actionv2.GetMonitorV2Action
import org.opensearch.alerting.actionv2.IndexMonitorV2Action
import org.opensearch.alerting.actionv2.SearchMonitorV2Action
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.alertsv2.AlertV2Indices.Companion.ALL_ALERT_V2_INDEX_PATTERN
import org.opensearch.alerting.alertsv2.AlertV2Mover
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN
import org.opensearch.alerting.core.JobSweeper
Expand All @@ -28,7 +33,9 @@ import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsV2Handler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.AlertingV2Settings
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.modelv2.MonitorV2
Expand Down Expand Up @@ -57,6 +64,8 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandlerv2.RestDeleteMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestExecuteMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestGetAlertsV2Action
import org.opensearch.alerting.resthandlerv2.RestGetMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestIndexMonitorV2Action
import org.opensearch.alerting.resthandlerv2.RestSearchMonitorV2Action
Expand Down Expand Up @@ -93,6 +102,8 @@ import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transportv2.TransportDeleteMonitorV2Action
import org.opensearch.alerting.transportv2.TransportExecuteMonitorV2Action
import org.opensearch.alerting.transportv2.TransportGetAlertsV2Action
import org.opensearch.alerting.transportv2.TransportGetMonitorV2Action
import org.opensearch.alerting.transportv2.TransportIndexMonitorV2Action
import org.opensearch.alerting.transportv2.TransportSearchMonitorV2Action
Expand Down Expand Up @@ -194,8 +205,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var alertV2Indices: AlertV2Indices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
lateinit var alertV2Mover: AlertV2Mover
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
Expand Down Expand Up @@ -236,9 +249,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

// Alerting V2
RestIndexMonitorV2Action(),
RestExecuteMonitorV2Action(),
RestDeleteMonitorV2Action(),
RestGetMonitorV2Action(),
RestSearchMonitorV2Action(settings, clusterService),
RestGetAlertsV2Action(),
RestScheduledJobStatsV2Handler()
)
}

Expand Down Expand Up @@ -278,6 +294,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(GetMonitorV2Action.INSTANCE, TransportGetMonitorV2Action::class.java),
ActionPlugin.ActionHandler(SearchMonitorV2Action.INSTANCE, TransportSearchMonitorV2Action::class.java),
ActionPlugin.ActionHandler(DeleteMonitorV2Action.INSTANCE, TransportDeleteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(ExecuteMonitorV2Action.INSTANCE, TransportExecuteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(GetAlertsV2Action.INSTANCE, TransportGetAlertsV2Action::class.java)
)
}

Expand Down Expand Up @@ -314,6 +332,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
alertV2Indices = AlertV2Indices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
val triggerService = TriggerService(scriptService)
runner = MonitorRunnerService
Expand All @@ -325,6 +344,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerAlertV2Indices(alertV2Indices)
.registerInputService(
InputService(
client,
Expand All @@ -351,6 +371,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
alertV2Mover = AlertV2Mover(environment.settings(), client, threadPool, clusterService, xContentRegistry)
this.threadPool = threadPool
this.clusterService = clusterService

Expand Down Expand Up @@ -378,6 +399,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertV2Mover,
lockService,
alertService,
triggerService
Expand Down Expand Up @@ -475,7 +497,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_V2_QUERY_RESULTS_MAX_SIZE,
AlertingSettings.ALERT_V2_PER_RESULT_TRIGGER_MAX_ALERTS,
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH,
AlertingV2Settings.ALERTING_V2_ENABLED
)
}

Expand All @@ -494,6 +517,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
SystemIndexDescriptor(ALL_ALERT_INDEX_PATTERN, "Alerting Plugin system index pattern"),
SystemIndexDescriptor(SCHEDULED_JOBS_INDEX, "Alerting Plugin Configuration index"),
SystemIndexDescriptor(ALL_COMMENTS_INDEX_PATTERN, "Alerting Comments system index pattern"),
SystemIndexDescriptor(ALL_ALERT_V2_INDEX_PATTERN, "Alerting V2 Alerts index pattern")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
Expand Down Expand Up @@ -35,6 +36,7 @@ data class MonitorRunnerExecutionContext(
var settings: Settings? = null,
var threadPool: ThreadPool? = null,
var alertIndices: AlertIndices? = null,
var alertV2Indices: AlertV2Indices? = null,
var inputService: InputService? = null,
var triggerService: TriggerService? = null,
var alertService: AlertService? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@ import org.opensearch.alerting.action.ExecuteMonitorResponse
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.ExecuteWorkflowRequest
import org.opensearch.alerting.action.ExecuteWorkflowResponse
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Action
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Request
import org.opensearch.alerting.actionv2.ExecuteMonitorV2Response
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.alertsv2.AlertV2Indices
import org.opensearch.alerting.alertsv2.AlertV2Mover.Companion.moveAlertV2s
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.modelv2.MonitorV2
import org.opensearch.alerting.modelv2.MonitorV2RunResult
import org.opensearch.alerting.modelv2.PPLSQLMonitor
import org.opensearch.alerting.modelv2.PPLSQLMonitor.Companion.PPL_SQL_MONITOR_TYPE
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.script.TriggerV2ExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
Expand Down Expand Up @@ -137,6 +147,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerAlertV2Indices(alertV2Indices: AlertV2Indices): MonitorRunnerService {
this.monitorCtx.alertV2Indices = alertV2Indices
return this
}

fun registerInputService(inputService: InputService): MonitorRunnerService {
this.monitorCtx.inputService = inputService
return this
Expand Down Expand Up @@ -316,6 +331,18 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
logger.error("Failed to move active alerts for monitor [${job.id}].", e)
}
}
} else if (job is MonitorV2) {
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
moveAlertV2s(job.id, job, monitorCtx)
}
}
} catch (e: Exception) {
logger.error("Failed to move active alertV2s for monitorV2 [${job.id}].", e)
}
}
} else {
throw IllegalArgumentException("Invalid job type")
}
Expand All @@ -339,6 +366,15 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
} catch (e: Exception) {
logger.error("Failed to move active alerts for monitor [$jobId].", e)
}
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertV2Indices!!.isAlertV2Initialized()) {
moveAlertV2s(jobId, null, monitorCtx)
}
}
} catch (e: Exception) {
logger.error("Failed to move active alertV2s for monitorV2 [$jobId].", e)
}
}
}

Expand Down Expand Up @@ -408,6 +444,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
}
}
}
is MonitorV2 -> {
if (job !is PPLSQLMonitor) {
throw IllegalStateException("Unexpected invalid MonitorV2 type: ${job.javaClass.name}")
}

launch {
var monitorLock: LockModel? = null
try {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${monitorLock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing $PPL_SQL_MONITOR_TYPE ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
val executeMonitorV2Request = ExecuteMonitorV2Request(
false,
false,
job.id, // only need to pass in MonitorV2 ID
null, // no need to pass in MonitorV2 object itself
TimeValue(periodEnd.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorV2Response> {
monitorCtx.client!!.execute(
ExecuteMonitorV2Action.INSTANCE,
executeMonitorV2Request,
it
)
}
} catch (e: Exception) {
logger.error("MonitorV2 run failed for monitor with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
else -> {
throw IllegalArgumentException("Invalid job type")
}
Expand All @@ -433,20 +507,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
): MonitorRunResult<*> {
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
// has not been updated.
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
updateAlertingConfigIndexSchema()

if (job is Workflow) {
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
Expand Down Expand Up @@ -539,6 +600,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
}
}

// after the above JobRunner interface override runJob calls ExecuteMonitorV2 API,
// the ExecuteMonitorV2 transport action calls this function to call the PPLSQLMonitorRunner,
// where the core PPL/SQL Monitor execution logic resides
suspend fun runJobV2(
monitorV2: MonitorV2,
periodEnd: Instant,
dryrun: Boolean,
manual: Boolean,
transportService: TransportService,
): MonitorV2RunResult<*> {
updateAlertingConfigIndexSchema()

val executionId = "${monitorV2.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
val monitorV2Type = when (monitorV2) {
is PPLSQLMonitor -> PPL_SQL_MONITOR_TYPE
else -> throw IllegalStateException("Unexpected MonitorV2 type: ${monitorV2.javaClass.name}")
}

logger.info(
"Executing scheduled monitor v2 - id: ${monitorV2.id}, type: $monitorV2Type, " +
"periodEnd: $periodEnd, dryrun: $dryrun, manual: $manual, executionId: $executionId"
)

// for now, always call PPLSQLMonitorRunner since only PPL Monitors are initially supported
// to introduce new MonitorV2 type, create its MonitorRunner, and if/else branch
// to the corresponding MonitorRunners based on type. For now, default to PPLSQLMonitorRunner
val runResult = PPLSQLMonitorRunner.runMonitorV2(
monitorV2,
monitorCtx,
periodEnd,
dryrun,
manual,
executionId = executionId,
transportService = transportService,
)
return runResult
}

// TODO: See if we can move below methods (or few of these) to a common utils
internal fun getRolesForMonitor(monitor: Monitor): List<String> {
/*
Expand Down Expand Up @@ -582,4 +681,27 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

internal fun compileTemplateV2(template: Script, ctx: TriggerV2ExecutionContext): String {
return monitorCtx.scriptService!!.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

private fun updateAlertingConfigIndexSchema() {
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
}
}
Loading
Loading