Skip to content

Commit

Permalink
Skipping destination migration if alerting index is not initialized (o…
Browse files Browse the repository at this point in the history
…pensearch-project#417)

Signed-off-by: Ravi 6005951+thalurur@users.noreply.github.com
Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
thalurur authored and AWSHurneyt committed Apr 21, 2022
1 parent 4405723 commit 148a2e7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
this.threadPool = threadPool
this.clusterService = clusterService
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.ClusterChangedEvent
Expand All @@ -24,7 +25,8 @@ import kotlin.coroutines.CoroutineContext
class DestinationMigrationCoordinator(
private val client: Client,
private val clusterService: ClusterService,
private val threadPool: ThreadPool
private val threadPool: ThreadPool,
private val scheduledJobIndices: ScheduledJobIndices
) : ClusterStateListener, CoroutineScope, LifecycleListener() {

private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -66,6 +68,12 @@ class DestinationMigrationCoordinator(
}

private fun initMigrateDestinations() {
if (!scheduledJobIndices.scheduledJobIndexExists()) {
logger.debug("Alerting config index is not initialized")
scheduledMigration?.cancel()
return
}

if (!clusterService.state().nodes().isLocalNodeElectedMaster) {
scheduledMigration?.cancel()
return
Expand Down

0 comments on commit 148a2e7

Please sign in to comment.