-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[ML Data Frame] Start directly data frame rather than via the scheduler #42067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/ml-core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your question: If the persistent task gets removed, the scheduler gets removed as well, so having autostop after the checkpoint is reached we indirectly remove the scheduler afaik or "that's the plan".
return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId(); | ||
} | ||
|
||
static SchedulerEngine.Schedule next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe for a followup PR: can be non-static? to access members and change the interval dynamically.
Depending on how far the DF has progressed
I hit a problem in the tests which assert on the indexer state. When the indexer is started it's state goes from STOPPED -> STARTED With this change because the indexer is triggered it should be in state = INDEXING when the start request returns, unless it has finished really quickly. We have a bunch of yml tests that start a data feed then assert state = STARTED. Those tests aren't processing much data so we expect to go from STARTED -> INDEXING -> STARTED pretty quickly, when we check the state it could be STARTED or INDEXING. I pushed a change to account for this but what I don't understand is why the tests haven't failed in CI previously given the indexer could be either STARTED or INDEXING |
run elasticsearch-ci/packaging-sample |
|
||
private SchedulerEngine.Schedule next() { | ||
return (startTime, now) -> { | ||
return now + 1000; // to be fixed, hardcode something |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure future work will be needed here so that we can adjust for failures, etc.
@@ -207,6 +207,10 @@ public synchronized void start(ActionListener<Response> listener) { | |||
persistStateToClusterState(state, ActionListener.wrap( | |||
task -> { | |||
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); | |||
long now = System.currentTimeMillis(); | |||
// kick off the indexer | |||
triggered(new Event(schedulerJobName(), now, now)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me, once we add a tad more logic to the SchedulerEngine.Schedule
this won't be necessary because it will know if it has to be triggered right now, later, or not at all.
But this is a good stop-gap until we fix the created schedule.
…er (elastic#42067) Trigger indexer start directly to put the indexer in INDEXING state immediately
…er (elastic#42067) Trigger indexer start directly to put the indexer in INDEXING state immediately
Start triggers the indexer directly, the indexer runs in another thread so this is safe to do from a API request network thread. Moves the scheduled
next()
method and starting the scheduler from the task executor into the DF task.This small change means the scheduler does not have to run at a high frequency to reduce latency in starting the data frame.
There is a question about whether the scheduler should be stopped on finish when a checkpoint is reached