Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskRunner refactor #260

Merged
merged 19 commits into from
Oct 11, 2018
Merged

TaskRunner refactor #260

merged 19 commits into from
Oct 11, 2018

Conversation

jlowin
Copy link
Member

@jlowin jlowin commented Oct 10, 2018

As discussed offline, the Runners are becoming an opaque block of if-statements and State manipulations.

This refactor splits the TaskRunner into a larger number of small method calls, referred to as "steps". Each step takes a state argument (and any other arguments it requires) and does one of two things:

  • returns a State
  • raises an ENDRUN(State) error

If it returns a state, that value is passed to the next state in the pipeline. If it raises an ENDRUN(State) error, then the pipeline is terminated and the wrapped state becomes the final state of the task.

For example, there is a step called TaskRunner.check_task_trigger_step(). When called with a state value, it checks the task's trigger function. If the trigger passes, it returns the same state so the next step can use it. If the trigger fails, then it raises ENDRUN(TriggerFailed()) (or the actual error raised by the trigger, if appropriate, and all step processing stops.

Another example is a state called TaskRunner.set_task_to_running_step(). This expects to get a PENDING state and returns a RUNNING state if it does; otherwise it raises ENDRUN(state) to indicate that an unexpected state was received and processing should halt.

This has already helped identify some subtle edge cases -- for example, the any_failed and any_successful triggers would fail if the task had no upstream tasks. In practice, this never mattered because tasks with no upstream tasks are always start_tasks and therefore ignore their triggers anyway -- but it's still bad practice. Now, tasks with no upstream tasks always pass the trigger check.

Important Note

This new ENDRUN exception is very similar to the existing DONTRUN signal, so you might ask why we need it?

DONTRUN is exclusively used by Prefect's engine to indicate that some processing should stop. However, it lives in the signals module which we intend users to use. This creates the need to trap DONTRUNs in a special way, since sometimes the TaskRunner/FlowRunner is using them for control flow, or sometimes a user might be using it (without realizing it has special significance).

ENDRUN is intended to fully replace DONTRUN in terms of functionality, and is not even in signals.py (so users should not be trying to use it!). If this PR is accepted, I will submit another one (probably much simpler) to refactor the FlowRunner, at which time I will remove DONTRUN completely. But until that time, it's still needed by the existing FlowRunner.

@jlowin
Copy link
Member Author

jlowin commented Oct 10, 2018

@cicdw making one more small change here to split out caching from running.

@jlowin
Copy link
Member Author

jlowin commented Oct 10, 2018

@cicdw ready for review!

@cicdw
Copy link
Member

cicdw commented Oct 10, 2018

I haven't finished a full detailed review but two incredibly superficial / trivial things stood out to me:

  • in other places we simply call the input argument upstream_states and here you're calling it upstream_states_set; I think I prefer just upstream_states because technically the logic is sound even if you pass a list / iterable
  • the pattern self.do_a_thing_step reads awkwardly to me

@jlowin
Copy link
Member Author

jlowin commented Oct 10, 2018

I agree with both points.

upstream_states_set was actually because I got tired of copying that huge Dict[Edge, Union...] type everywhere... and just didn't replace it later. The truth is I think that the set is what we want for most of these (the rest of the information is surperfluous, often we just want a set of states). Let me look at cleanly replacing it.

These could definitely use the word step -- it's a holdover from when they were classes (and just a way to set them apart). Doesn't serve any purpose other than to identify them as having a specific signature. Should I just remove the _step suffix?

@jlowin jlowin force-pushed the taskrunner-refactor branch from f5283e8 to 25d93f7 Compare October 11, 2018 00:41
@jlowin
Copy link
Member Author

jlowin commented Oct 11, 2018

On further thought, the upstream_states_set is a holdover from the previous implementation -- I think it was handled worse there, where the call was get_run_state(upstream_states=upstream_states_set) -- so I'm trying to unify the kwarg and the variable name.

upstream_states has type Dict[Edge, Union[State, List[State]]]
upstream_states_set has type Set[State], so it's much simpler for the steps to work with (and they don't need any other detail)

Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some incredibly minor superficial changes requested; this is a really nice refactor and I really like how this makes the tests very targeted and easy to understand / read. 💯

I also think it's a great sign that no FlowRunner or Flow tests (other than the one) had to be updated to handle this significant refactor.

CHANGELOG.md Outdated
@@ -3,7 +3,7 @@
## 0.4.0 <Badge text="alpha" type="warn">
### Major Features

- None
- Refactor `TaskRunner` into a moduler pipeline - [#260](https://github.com/PrefectHQ/prefect/pull/260)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modular (sp)

# check that upstream tasks are finished
# ---------------------------------------------------------
Raises:
- signals.ENDRUN if upstream tasks are not finished.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: if you write the Raises: section like:

Raises:
    - signals.ENDRUN: if upstream tasks are not finished

(note the added colon) the signals.ENDRUN will be code-formatted in the docs, otherwise it'll just be formatted as plain text.


Raises:
- signals.PAUSE if the task raises PAUSE
- signals.ENDRUN if the task is not ready to run
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar for my other Raises comment.

cached_inputs={"a": 1},
cached_result=2,
# cached_result_expiration=datetime.datetime.utcnow()
# + datetime.timedelta(minutes=1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these comments just be removed?

Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jlowin jlowin merged commit 4abd6e3 into master Oct 11, 2018
@jlowin jlowin deleted the taskrunner-refactor branch October 11, 2018 21:01
@jlowin jlowin mentioned this pull request Oct 11, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants