Skip to content

Commit

Permalink
Remove step suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Oct 10, 2018
1 parent 5faf7b6 commit f5283e8
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 81 deletions.
36 changes: 18 additions & 18 deletions src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,41 +134,41 @@ def run(

try:
# check if all upstream tasks have finished
state = self.check_upstream_finished_step(
state = self.check_upstream_finished(
state, upstream_states_set=upstream_states_set
)

# check if any upstream tasks skipped (and if we need to skip)
state = self.check_upstream_skipped_step(
state = self.check_upstream_skipped(
state, upstream_states_set=upstream_states_set
)

# check if the task's trigger passes
state = self.check_task_trigger_step(
state = self.check_task_trigger(
state,
upstream_states_set=upstream_states_set,
ignore_trigger=ignore_trigger,
)

# check to make sure the task is in a pending state
state = self.check_task_is_pending_step(state)
state = self.check_task_is_pending(state)

# check to see if the task has a cached result
state = self.check_task_is_cached_step(state, inputs=task_inputs)
state = self.check_task_is_cached(state, inputs=task_inputs)

# set the task state to running
state = self.set_task_to_running_step(state)
state = self.set_task_to_running(state)

# run the task!
state = self.run_task_step(
state = self.run_task(
state, inputs=task_inputs, timeout_handler=timeout_handler
)

# cache the output, if appropriate
state = self.cache_result_step(state, inputs=task_inputs)
state = self.cache_result(state, inputs=task_inputs)

# check if the task needs to be retried
state = self.check_for_retry_step(state, inputs=task_inputs)
state = self.check_for_retry(state, inputs=task_inputs)

# a ENDRUN signal at any point breaks the chain and we return
# the most recently computed state
Expand All @@ -185,7 +185,7 @@ def run(

return state

def check_upstream_finished_step(
def check_upstream_finished(
self, state: State, upstream_states_set: Set[State]
) -> State:
"""
Expand All @@ -205,7 +205,7 @@ def check_upstream_finished_step(
raise ENDRUN(state)
return state

def check_upstream_skipped_step(
def check_upstream_skipped(
self, state: State, upstream_states_set: Set[State]
) -> State:
"""
Expand All @@ -232,7 +232,7 @@ def check_upstream_skipped_step(
)
return state

def check_task_trigger_step(
def check_task_trigger(
self,
state: State,
upstream_states_set: Set[State],
Expand Down Expand Up @@ -279,7 +279,7 @@ def check_task_trigger_step(

return state

def check_task_is_pending_step(self, state: State) -> State:
def check_task_is_pending(self, state: State) -> State:
"""
Checks to make sure the task is in a PENDING state.
Expand Down Expand Up @@ -313,7 +313,7 @@ def check_task_is_pending_step(self, state: State) -> State:
)
raise ENDRUN(state)

def check_task_is_cached_step(self, state: State, inputs: Dict[str, Any]) -> State:
def check_task_is_cached(self, state: State, inputs: Dict[str, Any]) -> State:
"""
Args:
- state (State): the current state of this task
Expand All @@ -332,7 +332,7 @@ def check_task_is_cached_step(self, state: State, inputs: Dict[str, Any]) -> Sta
raise ENDRUN(Success(result=state.cached_result, cached=state))
return state

def set_task_to_running_step(self, state: State) -> State:
def set_task_to_running(self, state: State) -> State:
"""
Sets the task to running
Expand All @@ -350,7 +350,7 @@ def set_task_to_running_step(self, state: State) -> State:

return Running(message="Starting task run.")

def run_task_step(
def run_task(
self, state: State, inputs: Dict[str, Any], timeout_handler: Optional[Callable]
) -> State:
"""
Expand Down Expand Up @@ -402,7 +402,7 @@ def run_task_step(

return Success(result=result, message="Task run succeeded.")

def cache_result_step(self, state: State, inputs: Dict[str, Any]) -> State:
def cache_result(self, state: State, inputs: Dict[str, Any]) -> State:
"""
Caches the result of a successful task, if appropriate.
Expand Down Expand Up @@ -438,7 +438,7 @@ def cache_result_step(self, state: State, inputs: Dict[str, Any]) -> State:

return state

def check_for_retry_step(self, state: State, inputs: Dict[str, Any]) -> State:
def check_for_retry(self, state: State, inputs: Dict[str, Any]) -> State:
"""
Checks to see if a FAILED task should be retried. Also assigns a retry time to
RETRYING states that don't have one set (for example, if raised from inside a task).
Expand Down
Loading

0 comments on commit f5283e8

Please sign in to comment.