diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 86ab0e050..f5dfab487 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -234,7 +234,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) - destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool) + destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices) this.threadPool = threadPool this.clusterService = clusterService return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt index a806e3aeb..5cb37c519 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.ClusterChangedEvent @@ -24,7 +25,8 @@ import kotlin.coroutines.CoroutineContext class DestinationMigrationCoordinator( private val client: Client, private val clusterService: ClusterService, - private val threadPool: ThreadPool + private val threadPool: ThreadPool, + private val scheduledJobIndices: ScheduledJobIndices ) : ClusterStateListener, CoroutineScope, LifecycleListener() { private val logger = LogManager.getLogger(javaClass) @@ -66,6 +68,12 @@ class DestinationMigrationCoordinator( } private fun initMigrateDestinations() { + if (!scheduledJobIndices.scheduledJobIndexExists()) { + logger.debug("Alerting config index is not initialized") + scheduledMigration?.cancel() + return + } + if (!clusterService.state().nodes().isLocalNodeElectedMaster) { scheduledMigration?.cancel() return