Skip to content

Commit ab9850b

Browse files
authored
[ML] renamed DatafeedManager to DatafeedRunner (#74082)
1 parent 0118341 commit ab9850b

File tree

10 files changed

+112
-112
lines changed

10 files changed

+112
-112
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.license.LicenseStateListener;
1414
import org.elasticsearch.license.XPackLicenseState;
1515
import org.elasticsearch.threadpool.ThreadPool;
16-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
16+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
1717
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
1818

1919
public class InvalidLicenseEnforcer implements LicenseStateListener {
@@ -22,16 +22,16 @@ public class InvalidLicenseEnforcer implements LicenseStateListener {
2222

2323
private final ThreadPool threadPool;
2424
private final XPackLicenseState licenseState;
25-
private final DatafeedManager datafeedManager;
25+
private final DatafeedRunner datafeedRunner;
2626
private final AutodetectProcessManager autodetectProcessManager;
2727

2828
private volatile boolean licenseStateListenerRegistered;
2929

3030
InvalidLicenseEnforcer(XPackLicenseState licenseState, ThreadPool threadPool,
31-
DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
31+
DatafeedRunner datafeedRunner, AutodetectProcessManager autodetectProcessManager) {
3232
this.threadPool = threadPool;
3333
this.licenseState = licenseState;
34-
this.datafeedManager = datafeedManager;
34+
this.datafeedRunner = datafeedRunner;
3535
this.autodetectProcessManager = autodetectProcessManager;
3636
}
3737

@@ -59,7 +59,7 @@ public void onFailure(Exception e) {
5959

6060
@Override
6161
protected void doRun() throws Exception {
62-
datafeedManager.stopAllDatafeedsOnThisNode("invalid license");
62+
datafeedRunner.stopAllDatafeedsOnThisNode("invalid license");
6363
autodetectProcessManager.closeAllJobsOnThisNode("invalid license");
6464
}
6565
});

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@
258258
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
259259
import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
260260
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
261-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
261+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
262262
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
263263
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
264264
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@@ -543,7 +543,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
543543

544544
private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
545545
private final SetOnce<DatafeedConfigProvider> datafeedConfigProvider = new SetOnce<>();
546-
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
546+
private final SetOnce<DatafeedRunner> datafeedRunner = new SetOnce<>();
547547
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
548548
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
549549
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
@@ -774,9 +774,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
774774
clusterService.getNodeName());
775775
DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(jobConfigProvider, datafeedConfigProvider,
776776
jobResultsProvider);
777-
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
777+
DatafeedRunner datafeedRunner = new DatafeedRunner(threadPool, client, clusterService, datafeedJobBuilder,
778778
System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager, datafeedContextProvider);
779-
this.datafeedManager.set(datafeedManager);
779+
this.datafeedRunner.set(datafeedRunner);
780780

781781
// Inference components
782782
final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService(resultsPersisterService,
@@ -823,7 +823,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
823823
this.memoryTracker.set(memoryTracker);
824824
MlLifeCycleService mlLifeCycleService =
825825
new MlLifeCycleService(
826-
clusterService, datafeedManager, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
826+
clusterService, datafeedRunner, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
827827
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
828828
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);
829829

@@ -832,7 +832,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
832832
clusterService.addListener(mlAutoUpdateService);
833833
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
834834
final InvalidLicenseEnforcer enforcer =
835-
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
835+
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedRunner, autodetectProcessManager);
836836
enforcer.listenForLicenseStateChanges();
837837

838838
// Perform node startup operations
@@ -852,7 +852,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
852852
autodetectProcessManager,
853853
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
854854
jobDataCountsPersister,
855-
datafeedManager,
855+
datafeedRunner,
856856
anomalyDetectionAuditor,
857857
dataFrameAnalyticsAuditor,
858858
inferenceAuditor,
@@ -886,7 +886,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
886886
memoryTracker.get(),
887887
client,
888888
expressionResolver),
889-
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver),
889+
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedRunner.get(), expressionResolver),
890890
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings,
891891
client,
892892
clusterService,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import org.elasticsearch.cluster.service.ClusterService;
1010
import org.elasticsearch.common.component.LifecycleListener;
11-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
11+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
1212
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
1313
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
1414
import org.elasticsearch.xpack.ml.process.MlController;
@@ -19,16 +19,16 @@
1919

2020
public class MlLifeCycleService {
2121

22-
private final DatafeedManager datafeedManager;
22+
private final DatafeedRunner datafeedRunner;
2323
private final MlController mlController;
2424
private final AutodetectProcessManager autodetectProcessManager;
2525
private final DataFrameAnalyticsManager analyticsManager;
2626
private final MlMemoryTracker memoryTracker;
2727

28-
MlLifeCycleService(ClusterService clusterService, DatafeedManager datafeedManager, MlController mlController,
28+
MlLifeCycleService(ClusterService clusterService, DatafeedRunner datafeedRunner, MlController mlController,
2929
AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
3030
MlMemoryTracker memoryTracker) {
31-
this.datafeedManager = Objects.requireNonNull(datafeedManager);
31+
this.datafeedRunner = Objects.requireNonNull(datafeedRunner);
3232
this.mlController = Objects.requireNonNull(mlController);
3333
this.autodetectProcessManager = Objects.requireNonNull(autodetectProcessManager);
3434
this.analyticsManager = Objects.requireNonNull(analyticsManager);
@@ -47,7 +47,7 @@ public synchronized void stop() {
4747
analyticsManager.markNodeAsShuttingDown();
4848
// This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the datafeeds, so they get reassigned.
4949
// We have to do this first, otherwise the datafeeds could fail if they send data to a dead autodetect process.
50-
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
50+
datafeedRunner.isolateAllDatafeedsOnThisNodeBeforeShutdown();
5151
// This kills autodetect processes WITHOUT closing the jobs, so they get reassigned.
5252
autodetectProcessManager.killAllProcessesOnThisNode();
5353
mlController.stop();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
5959
import org.elasticsearch.xpack.ml.MachineLearning;
6060
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
61-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
61+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
6262
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
6363
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
6464
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -417,12 +417,12 @@ private ElasticsearchStatusException createUnknownLicenseError(
417417
}
418418

419419
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
420-
private final DatafeedManager datafeedManager;
420+
private final DatafeedRunner datafeedRunner;
421421
private final IndexNameExpressionResolver resolver;
422422

423-
public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager, IndexNameExpressionResolver resolver) {
423+
public StartDatafeedPersistentTasksExecutor(DatafeedRunner datafeedRunner, IndexNameExpressionResolver resolver) {
424424
super(MlTasks.DATAFEED_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
425-
this.datafeedManager = datafeedManager;
425+
this.datafeedRunner = datafeedRunner;
426426
this.resolver = resolver;
427427
}
428428

@@ -461,8 +461,8 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
461461
datafeedTask.markAsCompleted();
462462
return;
463463
}
464-
datafeedTask.datafeedManager = datafeedManager;
465-
datafeedManager.run(datafeedTask,
464+
datafeedTask.datafeedRunner = datafeedRunner;
465+
datafeedRunner.run(datafeedTask,
466466
(error) -> {
467467
if (error != null) {
468468
datafeedTask.markAsFailed(error);
@@ -487,7 +487,7 @@ public static class DatafeedTask extends AllocatedPersistentTask implements Star
487487
private final long startTime;
488488
private final Long endTime;
489489
/* only pck protected for testing */
490-
volatile DatafeedManager datafeedManager;
490+
volatile DatafeedRunner datafeedRunner;
491491

492492
DatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.DatafeedParams params,
493493
Map<String, String> headers) {
@@ -530,24 +530,24 @@ public boolean shouldCancelChildrenOnCancellation() {
530530
}
531531

532532
public void stop(String reason, TimeValue timeout) {
533-
if (datafeedManager != null) {
534-
datafeedManager.stopDatafeed(this, reason, timeout);
533+
if (datafeedRunner != null) {
534+
datafeedRunner.stopDatafeed(this, reason, timeout);
535535
}
536536
}
537537

538538
public void isolate() {
539-
if (datafeedManager != null) {
540-
datafeedManager.isolateDatafeed(getAllocationId());
539+
if (datafeedRunner != null) {
540+
datafeedRunner.isolateDatafeed(getAllocationId());
541541
}
542542
}
543543

544544
public Optional<GetDatafeedRunningStateAction.Response.RunningState> getRunningState() {
545-
if (datafeedManager == null) {
545+
if (datafeedRunner == null) {
546546
return Optional.empty();
547547
}
548548
return Optional.of(new GetDatafeedRunningStateAction.Response.RunningState(
549549
this.endTime == null,
550-
datafeedManager.finishedLookBack(this)
550+
datafeedRunner.finishedLookBack(this)
551551
));
552552
}
553553
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java renamed to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunner.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@
5353
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
5454
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
5555

56-
public class DatafeedManager {
56+
public class DatafeedRunner {
5757

58-
private static final Logger logger = LogManager.getLogger(DatafeedManager.class);
58+
private static final Logger logger = LogManager.getLogger(DatafeedRunner.class);
5959

6060
private final Client client;
6161
private final ClusterService clusterService;
@@ -69,9 +69,9 @@ public class DatafeedManager {
6969
private final AutodetectProcessManager autodetectProcessManager;
7070
private final DatafeedContextProvider datafeedContextProvider;
7171

72-
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
73-
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
74-
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
72+
public DatafeedRunner(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
73+
Supplier<Long> currentTimeSupplier, AnomalyDetectionAuditor auditor,
74+
AutodetectProcessManager autodetectProcessManager, DatafeedContextProvider datafeedContextProvider) {
7575
this.client = Objects.requireNonNull(client);
7676
this.clusterService = Objects.requireNonNull(clusterService);
7777
this.threadPool = Objects.requireNonNull(threadPool);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.Map;
2929

3030
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
31-
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
31+
import static org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests.createDatafeedConfig;
3232
import static org.hamcrest.Matchers.contains;
3333
import static org.hamcrest.Matchers.equalTo;
3434
import static org.hamcrest.Matchers.nullValue;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
2222
import org.elasticsearch.xpack.core.ml.job.config.Job;
2323
import org.elasticsearch.xpack.core.ml.job.config.JobState;
24-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
25-
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
24+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
25+
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunnerTests;
2626
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
2727

2828
import java.util.Arrays;
@@ -50,37 +50,37 @@ protected NamedXContentRegistry xContentRegistry() {
5050
}
5151

5252
public void testValidate_jobClosed() {
53-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
53+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
5454
PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder().build();
55-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
55+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
5656
Exception e = expectThrows(ElasticsearchStatusException.class,
5757
() -> TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry()));
5858
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed"));
5959
}
6060

6161
public void testValidate_jobOpening() {
62-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
62+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
6363
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
6464
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder);
6565
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
66-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
66+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
6767

6868
TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
6969
}
7070

7171
public void testValidate_jobOpened() {
72-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
72+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
7373
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
7474
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder);
7575
PersistentTasksCustomMetadata tasks = tasksBuilder.build();
76-
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
76+
DatafeedConfig datafeedConfig1 = DatafeedRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
7777

7878
TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks, xContentRegistry());
7979
}
8080

8181
public void testDeprecationsLogged() {
82-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
83-
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
82+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
83+
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
8484
DatafeedConfig config = spy(datafeedConfig.build());
8585
doReturn(Collections.singletonList("Deprecated Agg")).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
8686
doReturn(Collections.singletonList("Deprecated Query")).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
@@ -94,8 +94,8 @@ public void testDeprecationsLogged() {
9494
}
9595

9696
public void testNoDeprecationsLogged() {
97-
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
98-
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
97+
Job job1 = DatafeedRunnerTests.createDatafeedJob().build(new Date());
98+
DatafeedConfig.Builder datafeedConfig = DatafeedRunnerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
9999
DatafeedConfig config = spy(datafeedConfig.build());
100100
doReturn(Collections.emptyList()).when(config).getAggDeprecations(any(NamedXContentRegistry.class));
101101
doReturn(Collections.emptyList()).when(config).getQueryDeprecations(any(NamedXContentRegistry.class));
@@ -160,10 +160,10 @@ public void testRemoteClusterVersionCheck() {
160160
public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
161161
TaskId parentTaskId,
162162
StartDatafeedAction.DatafeedParams params,
163-
DatafeedManager datafeedManager) {
163+
DatafeedRunner datafeedRunner) {
164164
TransportStartDatafeedAction.DatafeedTask task = new TransportStartDatafeedAction.DatafeedTask(id, type, action, parentTaskId,
165165
params, Collections.emptyMap());
166-
task.datafeedManager = datafeedManager;
166+
task.datafeedRunner = datafeedRunner;
167167
return task;
168168
}
169169
}

0 commit comments

Comments
 (0)