Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide API to get the last successful run of a Task #3080

Closed
nathanielc opened this issue Aug 5, 2020 · 13 comments · Fixed by #3144
Closed

Provide API to get the last successful run of a Task #3080

nathanielc opened this issue Aug 5, 2020 · 13 comments · Fixed by #3144

Comments

@nathanielc
Copy link
Contributor

A common uses case for a task is to process data on a schedule. Currently tasks are scheduled on regular intervals and so if a previous task run fails then there is a gap in what ever the task processes. As a user I want to easily write a task that can start where ever the previous successful task finished.

I would anticipate that the solution to this would be to add a function to a new package influxdata/tasks like:

lastRun : (taskID: string) => time

I would also anticipate that this function uses findRecord to check the _tasks bucket for the runs measurement to produce the result.

Questions:
Q: Should the time returned by this function be the start or finish time of the task?
A: I would expect it to be the start time, the duration of the task run should not effect what data the task processed so the start time is the more useful value
Q: Should the time returned account for the offset the task uses?
A: This one I am not sure, perhaps we should write out pseudo code for both methods for tasks that use offset and see if one presents itself as the easier more intuitive solution.
Q: Should we use an option for the name of the tasks bucket?
A: Probably, follow the pattern in the influxdata/monitor package.
Q: Since this code will likely require the use of tableFind/findRecord from a task do we need to block on or otherwise coordinate the issue to fix those functions in tasks influxdata/influxdb#18922 ?
A: We most likely do not need to block on the issue but we should coordinate that the issue is fixed before we advertise this behavior to users as it won't work until that is fixed. We should ensure that once that other fix lands that a task integration test exists that ensures this behavior works, but that is part of another issue.

DOD

  • Flux API exists that makes it easy to get the time of the last successful task run.
  • Tests exist that show the API works.
@nathanielc
Copy link
Contributor Author

Sync up with compute team first to check if they can provide the last successful run time as an option into the task run since they have that information inside the scheduler that submits the task to run.

I am hesitant to put dynamic information in the task option, because its currently a record of truth for metadata about the task, but perhaps a new option is appropriate.

What do we do if its the first time a task runs?

@barbaranelson
Copy link

This would also be very useful for a Telegraf use case, where we are running a telegraf input plugin as flux.from function. We are polling a cloud-based data source to pull in metrics, and when the task runs, we need some way to determine where our next poll interval should start from.

Ideally, we would be able to pass state from one task to the next (e.g. the previous run picked up records 1234 thru 2345, so the next run should start with record 2346. If that can't be easily added, we can live with getting the time, and we'll derive the offsets from that.

@aanthony1243
Copy link
Contributor

@nathanielc we will not modify the saved query, but rather inject this value at runtime. I'm not sure how we could/should communicate this option to the user so that they can use it correctly.

whatever we would do the first time a task runs would have the same problem in a flux-based stdlib solution -- what to do if the query comes back empty? I guess the flux solution gives the user control to decide what to do about setting a default. For the task-option version, we would need to set some default assumption in the task executor code.

@nathanielc
Copy link
Contributor Author

@aanthony1243 Yeah that makes sense that you would not change the saved query, for that reason I think we should use a new option so that its not confusing that its different at runtime from when you save the query.

Thinking on this a bit more I am beginning to dislike the option idea as it means that the Flux script can't run unless its run from within the task scheduler env. For example you wouldn't be able to copy/paste the task into the data explorer to give it a quick run without first rewriting away uses of the last run time.

I am going to think on this a bit more. Open to any thoughts from others.

@adrian-thurston
Copy link
Contributor

I think what @barbaranelson asked for will get asked for again by users. If I can keep a time to track my progress moving through an external data source, why can't I have integers, strings, etc? For example, most recent row-id from postgres, or filename+offset in log file, or UID from IMAP server, etc. Having to map from last run time to a progress mark in some other source could be a pain.

@rickspencer3
Copy link
Contributor

User requirements are that they can reprocess for all time. For example, it may be up to a week. This is meaningful because the _task bucket has only 3 days of retention.

@nathanielc
Copy link
Contributor Author

After getting clarity on the user requirement that the last successful run time could be a long time in the past its clear that trying to use the _task bucket is not a good solution.

Users will also expect to be able to copy paste a task into the data explorer to iterate on the task. For example to turn on a profiler and optimize the structure of the task. This means that injecting an option dynamically from the task subsystem is not a good solution either as the Flux code becomes dependent on that option existing.

To me this means we should do the following:

Add a new package influxdata/tasks that has a builtin function lastSuccess that can make a call to the task service API to get the last successful run time for a task. This has the advantage that it works outside the context of the task subsystem so it can be run from the data explorer. It also has the advantage that it can get access to the last run time even if its outside the retention window of the _tasks bucket.

Additionally we need to define the behavior of a such a function in the case that there was no previous successful run. For that case we should provide a time argument that is returned which defaults to -inf i.e. the oldest time we can represent.

The function signature would look like this:

package tasks

builtin lastSuccess : (task: string, ?missingTime: A) -> time where A: Timeable

The missingTime optional argument defaults to the oldest time if the last successful run time is missing. As an aside we probably want a better name than missingTime for that parameter.

With this API the usage would look like this:

import "influxdata/tasks"

option task = {...}

from()
    |> range(start: tasks.lastSuccess(name: task.name))
    |> filter(...)
    // ...

The behavior will be that for the first run of the task the task runs against all data in the bucket. Then each subsequent task run will pick up where the previous left off.

Alternatively the user could do this:

import "influxdata/tasks"

option task = {...}

from()
    |> range(start: tasks.lastSuccess(name: task.name, missingTime:  -6mo))
    |> filter(...)
    // ...

In this case the first time the task runs it will run for only the past 6mo and then each subsequent task run will pick up where the previous left off.

@nathanielc
Copy link
Contributor Author

@aanthony1243 If we use the TaskService API to find the task by its name can we use the LastestCompleted field on the task for the last successful run time? Or is that last time it ran regardless of its status?

@nathanielc
Copy link
Contributor Author

@aanthony1243 A follow up question, is the time that is stored for LastestCompleted the now time of the previous run or the wall clock time for when it finished? We need to make sure we do not leave small gaps in these time markers that are dependent on the runtime of task itself.

@adrian-thurston adrian-thurston self-assigned this Aug 19, 2020
@rockstar
Copy link
Contributor

We chatted about this yesterday with the query team. What we're going to do is create a tasks package with a lastSuccess function that returns the last successful time of the task execution or a default specified as a parameter. The last successful time will be injected as an option by the query team.

@rockstar rockstar self-assigned this Aug 19, 2020
@adrian-thurston
Copy link
Contributor

@rockstar I picked this up today but don't have any concrete progress and my day is about done so it's in your hands

@nathanielc
Copy link
Contributor Author

We would like to feature flag this change an get feedback from specific customers wanting to use it before opening it to a wider audience. I think that means we should land the package into the experimental parent namespace. Thoughts on the best way to feature flag this?

@adrian-thurston
Copy link
Contributor

A generic environment variable setup for flux would be super helpful here. The flux library could just look into the environment and if the setting is there return it, otherwise return the default.

(aside) I think now could be refactored to be part of the environment.

If it was a generic key-value store (like a process's) we wouldn't have to touch any code to pass new items down.

rockstar added a commit to rockstar/flux that referenced this issue Aug 25, 2020
This patch adds a `lastSuccess` function that can be used to return the
last successful time a task ran. This patch only provides the interface,
and will error if called directly, and will be overridden/implemented
on the server first.

Fixes influxdata#3080
rockstar added a commit to rockstar/flux that referenced this issue Aug 25, 2020
This patch adds a `lastSuccess` function that can be used to return the
last successful time a task ran. This patch only provides the interface,
and will error if called directly, and will be overridden/implemented
on the server first.

Fixes influxdata#3080
rockstar added a commit that referenced this issue Aug 25, 2020
This patch adds a `lastSuccess` function that can be used to return the
last successful time a task ran. This patch only provides the interface,
and will error if called directly, and will be overridden/implemented
on the server first.

Fixes #3080
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants