-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide API to get the last successful run of a Task #3080
Comments
Sync up with compute team first to check if they can provide the last successful run time as an option into the task run since they have that information inside the scheduler that submits the task to run. I am hesitant to put dynamic information in the What do we do if its the first time a task runs? |
This would also be very useful for a Telegraf use case, where we are running a telegraf input plugin as flux.from function. We are polling a cloud-based data source to pull in metrics, and when the task runs, we need some way to determine where our next poll interval should start from. Ideally, we would be able to pass state from one task to the next (e.g. the previous run picked up records 1234 thru 2345, so the next run should start with record 2346. If that can't be easily added, we can live with getting the time, and we'll derive the offsets from that. |
@nathanielc we will not modify the saved query, but rather inject this value at runtime. I'm not sure how we could/should communicate this option to the user so that they can use it correctly. whatever we would do the first time a task runs would have the same problem in a flux-based stdlib solution -- what to do if the query comes back empty? I guess the flux solution gives the user control to decide what to do about setting a default. For the task-option version, we would need to set some default assumption in the task executor code. |
@aanthony1243 Yeah that makes sense that you would not change the saved query, for that reason I think we should use a new option so that its not confusing that its different at runtime from when you save the query. Thinking on this a bit more I am beginning to dislike the option idea as it means that the Flux script can't run unless its run from within the task scheduler env. For example you wouldn't be able to copy/paste the task into the data explorer to give it a quick run without first rewriting away uses of the last run time. I am going to think on this a bit more. Open to any thoughts from others. |
I think what @barbaranelson asked for will get asked for again by users. If I can keep a time to track my progress moving through an external data source, why can't I have integers, strings, etc? For example, most recent row-id from postgres, or filename+offset in log file, or UID from IMAP server, etc. Having to map from last run time to a progress mark in some other source could be a pain. |
User requirements are that they can reprocess for all time. For example, it may be up to a week. This is meaningful because the _task bucket has only 3 days of retention. |
After getting clarity on the user requirement that the last successful run time could be a long time in the past its clear that trying to use the Users will also expect to be able to copy paste a task into the data explorer to iterate on the task. For example to turn on a profiler and optimize the structure of the task. This means that injecting an option dynamically from the task subsystem is not a good solution either as the Flux code becomes dependent on that option existing. To me this means we should do the following: Add a new package Additionally we need to define the behavior of a such a function in the case that there was no previous successful run. For that case we should provide a time argument that is returned which defaults to The function signature would look like this:
The With this API the usage would look like this:
The behavior will be that for the first run of the task the task runs against all data in the bucket. Then each subsequent task run will pick up where the previous left off. Alternatively the user could do this:
In this case the first time the task runs it will run for only the past 6mo and then each subsequent task run will pick up where the previous left off. |
@aanthony1243 If we use the TaskService API to find the task by its name can we use the LastestCompleted field on the task for the last successful run time? Or is that last time it ran regardless of its status? |
@aanthony1243 A follow up question, is the time that is stored for LastestCompleted the now time of the previous run or the wall clock time for when it finished? We need to make sure we do not leave small gaps in these time markers that are dependent on the runtime of task itself. |
We chatted about this yesterday with the query team. What we're going to do is create a |
@rockstar I picked this up today but don't have any concrete progress and my day is about done so it's in your hands |
We would like to feature flag this change an get feedback from specific customers wanting to use it before opening it to a wider audience. I think that means we should land the package into the |
A generic environment variable setup for flux would be super helpful here. The flux library could just look into the environment and if the setting is there return it, otherwise return the default. (aside) I think If it was a generic key-value store (like a process's) we wouldn't have to touch any code to pass new items down. |
This patch adds a `lastSuccess` function that can be used to return the last successful time a task ran. This patch only provides the interface, and will error if called directly, and will be overridden/implemented on the server first. Fixes influxdata#3080
This patch adds a `lastSuccess` function that can be used to return the last successful time a task ran. This patch only provides the interface, and will error if called directly, and will be overridden/implemented on the server first. Fixes influxdata#3080
This patch adds a `lastSuccess` function that can be used to return the last successful time a task ran. This patch only provides the interface, and will error if called directly, and will be overridden/implemented on the server first. Fixes #3080
A common uses case for a task is to process data on a schedule. Currently tasks are scheduled on regular intervals and so if a previous task run fails then there is a gap in what ever the task processes. As a user I want to easily write a task that can start where ever the previous successful task finished.
I would anticipate that the solution to this would be to add a function to a new package
influxdata/tasks
like:I would also anticipate that this function uses
findRecord
to check the_tasks
bucket for theruns
measurement to produce the result.Questions:
Q: Should the time returned by this function be the start or finish time of the task?
A: I would expect it to be the start time, the duration of the task run should not effect what data the task processed so the start time is the more useful value
Q: Should the time returned account for the offset the task uses?
A: This one I am not sure, perhaps we should write out pseudo code for both methods for tasks that use offset and see if one presents itself as the easier more intuitive solution.
Q: Should we use an option for the name of the tasks bucket?
A: Probably, follow the pattern in the
influxdata/monitor
package.Q: Since this code will likely require the use of
tableFind/findRecord
from a task do we need to block on or otherwise coordinate the issue to fix those functions in tasks influxdata/influxdb#18922 ?A: We most likely do not need to block on the issue but we should coordinate that the issue is fixed before we advertise this behavior to users as it won't work until that is fixed. We should ensure that once that other fix lands that a task integration test exists that ensures this behavior works, but that is part of another issue.
DOD
The text was updated successfully, but these errors were encountered: