Skip to content

Feature/resume workflow #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
class History(object):
def __init__(self, history):
self._history = history
self._activities = collections.defaultdict(
lambda: {'type': 'activity'})
self._child_workflows = collections.defaultdict(
lambda: {'type': 'child_workflow'})
self._activities = collections.OrderedDict()
self._child_workflows = collections.OrderedDict()
self._tasks = collections.OrderedDict()

@property
def events(self):
Expand All @@ -31,7 +30,9 @@ def get_activity(event):
'scheduled_id': event.id,
}
if event.activity_id not in self._activities:
self._activities[event.activity_id] = activity
id_ = event.activity_id
self._activities[id_] = activity
self._tasks[id_] = self._activities[id_]
else:
# When the executor retries a task, it schedules it again.
# We have to take care of not overriding some values set by the
Expand All @@ -40,10 +41,12 @@ def get_activity(event):
# corresponds to the last execution.
self._activities[event.activity_id].update(activity)
elif event.state == 'schedule_failed':
activity = self._activities[event.activity_id]
activity['state'] = event.state
activity['cause'] = event.cause
activity['activity_type'] = event.activity_type.copy()
self._activities[event.activity_id] = {
'type': 'activity',
'state': event.state,
'cause': event.cause,
'activity_type': event.activity_type.copy(),
}
elif event.state == 'started':
activity = get_activity(event)
activity['state'] = event.state
Expand Down Expand Up @@ -106,7 +109,9 @@ def get_workflow(event):
'state': event.state,
'initiated_event_id': event.id,
}
self._child_workflows[event.workflow_id] = workflow
id_ = event.workflow_id
self._child_workflows[id_] = workflow
self._tasks[id_] = self._child_workflows[id_]
elif event.state == 'started':
workflow = get_workflow(event)
workflow['state'] = event.state
Expand Down
31 changes: 31 additions & 0 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,26 @@ def starmap(self, callable, iterable):
iterable = executor.get_actual_value(iterable)
return super(Executor, self).starmap(callable, iterable)

def merge_previous_execution(self, execution):
previous_history = History(execution.history())
# Override input with the previous execution value.
self._history.events[0].input = json.dumps(
previous_history.events[0].input.copy(),
)

# Override already completed tasks to not execute them again.
previous_history.parse()
self._history._activities.update({
id_: activity for id_, activity in
previous_history._activities.iteritems() if
activity['state'] == 'completed'
})
self._history._child_workflows.update({
id_: child_workflow for id_, child_workflow in
previous_history._child_workflows.iteritems() if
child_workflow['state'] == 'completed'
})

def replay(self, history):
"""Executes the workflow from the start until it blocks.

Expand All @@ -291,6 +311,17 @@ def replay(self, history):
args = input.get('args', ())
kwargs = input.get('kwargs', {})

previous_workflow_execution = input.get('_previous_workflow_execution')
if previous_workflow_execution:
# Resume previous execution by injecting input and completed task
# in the current history.
ex = swf.models.WorkflowExecution(
domain=self.domain,
workflow_id=previous_workflow_execution['workflow_id'],
run_id=previous_workflow_execution['run_id'],
)
self.merge_previous_execution(ex)

try:
result = self.run_workflow(*args, **kwargs)
except exceptions.ExecutionBlocked:
Expand Down
44 changes: 44 additions & 0 deletions tests/test_dataflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import functools
import mock
Expand Down Expand Up @@ -1006,3 +1007,46 @@ def test_activity_not_found_schedule_failed_already_exists():
decisions, _ = executor.replay(history)

check_task_scheduled_decision(decisions[0], increment)


def test_resume_stopped_workflow_execution():
workflow = TestDefinition
executor = Executor(DOMAIN, workflow)

previous_history = builder.History(workflow)
decision_id = previous_history.last_id
(previous_history
.add_activity_task(
increment,
decision_id=decision_id,
last_state='completed',
activity_id='activity-tests.test_dataflow.increment-1',
input={'args': 1},
result=2)
.add_decision_task_scheduled()
.add_decision_task_started())

history = builder.History(
workflow,
input={
'_previous_workflow_execution': {
'workflow_id': 'WORKFLOW_ID',
'run_id': 'RUN_ID',
}
})
decision_id = previous_history.last_id
(history
.add_decision_task_scheduled()
.add_decision_task_started())

class FakeWorkflowExecution(object):
def __init__(self, *args, **kwargs):
pass

def history(self):
return previous_history

with mock.patch('swf.models.WorkflowExecution') as Mock:
Mock.return_value = FakeWorkflowExecution()
decisions, _ = executor.replay(history)
check_task_scheduled_decision(decisions[0], double)