Skip to content

Commit 426f08b

Browse files
[Refactor] Change stats collector job setting to be dynamic (opensearch-project#4092) (opensearch-project#4094)
(cherry picked from commit b6952b9) Co-authored-by: Pavan Yekbote <pybot@amazon.com>
1 parent b9765b0 commit 426f08b

File tree

12 files changed

+93
-19
lines changed

12 files changed

+93
-19
lines changed

common/src/main/java/org/opensearch/ml/common/settings/MLCommonsSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,5 +349,5 @@ private MLCommonsSettings() {}
349349

350350
// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
351351
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
352-
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
352+
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
353353
}

common/src/main/java/org/opensearch/ml/common/settings/MLFeatureEnabledSetting.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
8888
clusterService
8989
.getClusterSettings()
9090
.addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED, it -> isRagSearchPipelineEnabled = it);
91+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> {
92+
isStaticMetricCollectionEnabled = it;
93+
for (SettingsChangeListener listener : listeners) {
94+
listener.onStaticMetricCollectionEnabledChanged(it);
95+
}
96+
});
9197
}
9298

9399
/**

common/src/main/java/org/opensearch/ml/common/settings/SettingsChangeListener.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,20 @@ public interface SettingsChangeListener {
1818
* <li><code>false</code> if multi-tenancy is disabled</li>
1919
* </ul>
2020
*/
21-
void onMultiTenancyEnabledChanged(boolean isEnabled);
21+
default void onMultiTenancyEnabledChanged(boolean isEnabled) {
22+
// do nothing
23+
}
24+
25+
/**
26+
* Callback method that gets triggered when the static metric collection setting changes.
27+
*
28+
* @param isEnabled A boolean value indicating the new state of the static metric collection setting:
29+
* <ul>
30+
* <li><code>true</code> if static metric collection is enabled</li>
31+
* <li><code>false</code> if static metric collection is disabled</li>
32+
* </ul>
33+
*/
34+
default void onStaticMetricCollectionEnabledChanged(boolean isEnabled) {
35+
// do nothing
36+
}
2237
}

common/src/test/java/org/opensearch/ml/common/settings/MLFeatureEnabledSettingTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,19 @@ public void testMultiTenancyChangeNotifiesListeners() {
129129
setting.notifyMultiTenancyListeners(true);
130130
verify(mockListener).onMultiTenancyEnabledChanged(true);
131131
}
132+
133+
@Test
134+
public void testStaticMetricCollectionSettingChangeNotifiesListeners() {
135+
Settings settings = Settings.builder().put("plugins.ml_commons.metrics_static_collection_enabled", false).build();
136+
137+
MLFeatureEnabledSetting setting = new MLFeatureEnabledSetting(mockClusterService, settings);
138+
139+
SettingsChangeListener mockListener = mock(SettingsChangeListener.class);
140+
setting.addListener(mockListener);
141+
142+
mockClusterSettings.applySettings(Settings.builder().put("plugins.ml_commons.metrics_static_collection_enabled", true).build());
143+
144+
verify(mockListener).onStaticMetricCollectionEnabledChanged(true);
145+
assertTrue(setting.isStaticMetricCollectionEnabled());
146+
}
132147
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void clusterChanged(ClusterChangedEvent event) {
9191
for (DiscoveryNode node : state.nodes()) {
9292
if (node.isDataNode() && Version.V_3_1_0.onOrAfter(node.getVersion())) {
9393
if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) {
94-
mlTaskManager.startStatsCollectorJob();
94+
mlTaskManager.indexStatsCollectorJob(true);
9595
}
9696

9797
if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) {

plugin/src/main/java/org/opensearch/ml/jobs/MLJobParameter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ public class MLJobParameter implements ScheduledJobParameter {
4949

5050
public MLJobParameter() {}
5151

52-
public MLJobParameter(String name, Schedule schedule, Long lockDurationSeconds, Double jitter, MLJobType jobType) {
52+
public MLJobParameter(String name, Schedule schedule, Long lockDurationSeconds, Double jitter, MLJobType jobType, boolean isEnabled) {
5353
this.jobName = name;
5454
this.schedule = schedule;
5555
this.lockDurationSeconds = lockDurationSeconds;
5656
this.jitter = jitter;
5757

5858
Instant now = Instant.now();
59-
this.isEnabled = true;
59+
this.isEnabled = isEnabled;
6060
this.enabledTime = now;
6161
this.lastUpdateTime = now;
6262
this.jobType = jobType;

plugin/src/main/java/org/opensearch/ml/jobs/MLJobRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
9393
throw new IllegalArgumentException("Job parameters is invalid.");
9494
}
9595

96+
if (!jobParameter.isEnabled()) {
97+
throw new IllegalStateException(String.format("Attempted to run disabled job of type: %s", jobParameter.getJobType().name()));
98+
}
99+
96100
switch (jobParameter.getJobType()) {
97101
case STATS_COLLECTOR:
98102
MLStatsJobProcessor

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ public Collection<Object> createComponents(
632632
modelAccessControlHelper = new ModelAccessControlHelper(clusterService, settings);
633633
connectorAccessControlHelper = new ConnectorAccessControlHelper(clusterService, settings);
634634
mlFeatureEnabledSetting = new MLFeatureEnabledSetting(clusterService, settings);
635+
mlFeatureEnabledSetting.addListener(mlTaskManager);
635636
mlModelManager = new MLModelManager(
636637
clusterService,
637638
scriptService,

plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.ml.common.exception.MLException;
5252
import org.opensearch.ml.common.exception.MLLimitExceededException;
5353
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
54+
import org.opensearch.ml.common.settings.SettingsChangeListener;
5455
import org.opensearch.ml.engine.indices.MLIndicesHandler;
5556
import org.opensearch.ml.jobs.MLJobParameter;
5657
import org.opensearch.ml.jobs.MLJobType;
@@ -73,7 +74,7 @@
7374
* MLTaskManager is responsible for managing MLTask.
7475
*/
7576
@Log4j2
76-
public class MLTaskManager {
77+
public class MLTaskManager implements SettingsChangeListener {
7778
public static int TASK_SEMAPHORE_TIMEOUT = 5000; // 5 seconds
7879
private final Map<String, MLTaskCache> taskCaches;
7980
private final Client client;
@@ -553,7 +554,8 @@ public void startTaskPollingJob() {
553554
new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
554555
20L,
555556
null,
556-
MLJobType.BATCH_TASK_UPDATE
557+
MLJobType.BATCH_TASK_UPDATE,
558+
true
557559
);
558560

559561
IndexRequest indexRequest = new IndexRequest()
@@ -562,24 +564,27 @@ public void startTaskPollingJob() {
562564
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
563565
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
564566

565-
startJob(indexRequest, MLJobType.BATCH_TASK_UPDATE, () -> this.taskPollingJobStarted = true);
567+
indexJob(indexRequest, MLJobType.BATCH_TASK_UPDATE, () -> this.taskPollingJobStarted = true);
566568
} catch (IOException e) {
567569
log.error("Failed to index task polling job", e);
568570
}
569571
}
570572

571-
public void startStatsCollectorJob() {
572-
if (statsCollectorJobStarted) {
573-
return;
574-
}
573+
@Override
574+
public void onStaticMetricCollectionEnabledChanged(boolean isEnabled) {
575+
log.info("Static metric collection setting changed to: {}", isEnabled);
576+
indexStatsCollectorJob(isEnabled);
577+
}
575578

579+
public void indexStatsCollectorJob(boolean enabled) {
576580
try {
577581
MLJobParameter jobParameter = new MLJobParameter(
578582
MLJobType.STATS_COLLECTOR.name(),
579583
new IntervalSchedule(Instant.now(), 5, ChronoUnit.MINUTES),
580584
60L,
581585
null,
582-
MLJobType.STATS_COLLECTOR
586+
MLJobType.STATS_COLLECTOR,
587+
enabled
583588
);
584589

585590
IndexRequest indexRequest = new IndexRequest()
@@ -588,7 +593,7 @@ public void startStatsCollectorJob() {
588593
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
589594
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
590595

591-
startJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> this.statsCollectorJobStarted = true);
596+
indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> {});
592597
} catch (IOException e) {
593598
log.error("Failed to index stats collection job", e);
594599
}
@@ -601,7 +606,7 @@ public void startStatsCollectorJob() {
601606
* @param jobType the type of job being started
602607
* @param successCallback callback to execute on successful job indexing
603608
*/
604-
private void startJob(IndexRequest indexRequest, MLJobType jobType, Runnable successCallback) {
609+
private void indexJob(IndexRequest indexRequest, MLJobType jobType, Runnable successCallback) {
605610
mlIndicesHandler.initMLJobsIndex(ActionListener.wrap(success -> {
606611
if (success) {
607612
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

plugin/src/test/java/org/opensearch/ml/jobs/MLJobParameterTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void setUp() {
3535
lockDurationSeconds = 20L;
3636
jitter = 0.5;
3737
jobType = null;
38-
jobParameter = new MLJobParameter(jobName, schedule, lockDurationSeconds, jitter, jobType);
38+
jobParameter = new MLJobParameter(jobName, schedule, lockDurationSeconds, jitter, jobType, true);
3939
}
4040

4141
@Test
@@ -54,7 +54,7 @@ public void testToXContent() throws Exception {
5454
@Test
5555
public void testNullCase() throws IOException {
5656
String newJobName = "test-job";
57-
MLJobParameter nullParameter = new MLJobParameter(newJobName, null, null, null, null);
57+
MLJobParameter nullParameter = new MLJobParameter(newJobName, null, null, null, null, true);
5858
nullParameter.setLastUpdateTime(null);
5959
nullParameter.setEnabledTime(null);
6060

@@ -64,6 +64,7 @@ public void testNullCase() throws IOException {
6464

6565
assertTrue(jsonString.contains(newJobName));
6666
assertEquals(newJobName, nullParameter.getName());
67+
assertTrue(nullParameter.isEnabled());
6768
assertNull(nullParameter.getSchedule());
6869
assertNull(nullParameter.getLockDurationSeconds());
6970
assertNull(nullParameter.getJitter());

0 commit comments

Comments
 (0)