-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TaskRunner refactor #260
TaskRunner refactor #260
Conversation
@cicdw making one more small change here to split out caching from running. |
@cicdw ready for review! |
I haven't finished a full detailed review but two incredibly superficial / trivial things stood out to me:
|
I agree with both points.
These could definitely use the word step -- it's a holdover from when they were classes (and just a way to set them apart). Doesn't serve any purpose other than to identify them as having a specific signature. Should I just remove the |
f5283e8
to
25d93f7
Compare
On further thought, the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some incredibly minor superficial changes requested; this is a really nice refactor and I really like how this makes the tests very targeted and easy to understand / read. 💯
I also think it's a great sign that no FlowRunner
or Flow
tests (other than the one) had to be updated to handle this significant refactor.
CHANGELOG.md
Outdated
@@ -3,7 +3,7 @@ | |||
## 0.4.0 <Badge text="alpha" type="warn"> | |||
### Major Features | |||
|
|||
- None | |||
- Refactor `TaskRunner` into a moduler pipeline - [#260](https://github.com/PrefectHQ/prefect/pull/260) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modular
(sp)
src/prefect/engine/task_runner.py
Outdated
# check that upstream tasks are finished | ||
# --------------------------------------------------------- | ||
Raises: | ||
- signals.ENDRUN if upstream tasks are not finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: if you write the Raises:
section like:
Raises:
- signals.ENDRUN: if upstream tasks are not finished
(note the added colon) the signals.ENDRUN
will be code-formatted in the docs, otherwise it'll just be formatted as plain text.
src/prefect/engine/task_runner.py
Outdated
|
||
Raises: | ||
- signals.PAUSE if the task raises PAUSE | ||
- signals.ENDRUN if the task is not ready to run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar for my other Raises
comment.
tests/engine/test_task_runner.py
Outdated
cached_inputs={"a": 1}, | ||
cached_result=2, | ||
# cached_result_expiration=datetime.datetime.utcnow() | ||
# + datetime.timedelta(minutes=1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these comments just be removed?
Add state_handlers for implementing callbacks when states change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
As discussed offline, the Runners are becoming an opaque block of if-statements and
State
manipulations.This refactor splits the
TaskRunner
into a larger number of small method calls, referred to as "steps". Each step takes astate
argument (and any other arguments it requires) and does one of two things:State
ENDRUN(State)
errorIf it returns a state, that value is passed to the next state in the pipeline. If it raises an ENDRUN(State) error, then the pipeline is terminated and the wrapped state becomes the final state of the task.
For example, there is a step called
TaskRunner.check_task_trigger_step()
. When called with astate
value, it checks the task's trigger function. If the trigger passes, it returns the samestate
so the next step can use it. If the trigger fails, then it raisesENDRUN(TriggerFailed())
(or the actual error raised by the trigger, if appropriate, and all step processing stops.Another example is a state called
TaskRunner.set_task_to_running_step()
. This expects to get aPENDING
state and returns aRUNNING
state if it does; otherwise it raisesENDRUN(state)
to indicate that an unexpected state was received and processing should halt.This has already helped identify some subtle edge cases -- for example, the
any_failed
andany_successful
triggers would fail if the task had no upstream tasks. In practice, this never mattered because tasks with no upstream tasks are alwaysstart_tasks
and therefore ignore their triggers anyway -- but it's still bad practice. Now, tasks with no upstream tasks always pass the trigger check.Important Note
This new
ENDRUN
exception is very similar to the existingDONTRUN
signal, so you might ask why we need it?DONTRUN
is exclusively used by Prefect's engine to indicate that some processing should stop. However, it lives in thesignals
module which we intend users to use. This creates the need to trap DONTRUNs in a special way, since sometimes the TaskRunner/FlowRunner is using them for control flow, or sometimes a user might be using it (without realizing it has special significance).ENDRUN
is intended to fully replaceDONTRUN
in terms of functionality, and is not even insignals.py
(so users should not be trying to use it!). If this PR is accepted, I will submit another one (probably much simpler) to refactor theFlowRunner
, at which time I will removeDONTRUN
completely. But until that time, it's still needed by the existingFlowRunner
.