|
5 | 5 |
|
6 | 6 | package org.opensearch.ml.cluster; |
7 | 7 |
|
8 | | -import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX; |
| 8 | +import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX; |
9 | 9 | import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MONITORING_REQUEST_COUNT; |
10 | 10 |
|
11 | 11 | import java.util.List; |
@@ -39,6 +39,7 @@ public class MLCommonsClusterEventListener implements ClusterStateListener { |
39 | 39 | private final MLModelAutoReDeployer mlModelAutoReDeployer; |
40 | 40 | private final Client client; |
41 | 41 | private final MLFeatureEnabledSetting mlFeatureEnabledSetting; |
| 42 | + private boolean startedStatsJob; |
42 | 43 |
|
43 | 44 | public MLCommonsClusterEventListener( |
44 | 45 | ClusterService clusterService, |
@@ -90,12 +91,11 @@ public void clusterChanged(ClusterChangedEvent event) { |
90 | 91 | */ |
91 | 92 | for (DiscoveryNode node : state.nodes()) { |
92 | 93 | if (node.isDataNode() && node.getVersion().onOrAfter(Version.V_3_1_0)) { |
93 | | - if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) { |
94 | | - mlTaskManager.indexStatsCollectorJob(true); |
95 | | - } |
96 | | - |
97 | | - if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) { |
98 | | - mlTaskManager.startTaskPollingJob(); |
| 94 | + if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled() |
| 95 | + && !clusterService.state().getMetadata().hasIndex(ML_JOBS_INDEX) && !this.startedStatsJob) { |
| 96 | + mlTaskManager.indexStatsCollectorJob(true); |
| 97 | + // using this variable in case if same node has a cluster state change event and the state is not updated yet |
| 98 | + this.startedStatsJob = true; |
99 | 99 | } |
100 | 100 |
|
101 | 101 | break; |
|
0 commit comments