Skip to content

Feature/async local executor #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions simpleflow/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ def wait(*fs):
Raises a ``exceptions.ExecutionBlocked`` otherwise.

"""
if any(future.state == PENDING for future in fs):
raise exceptions.ExecutionBlocked()

return [future.result for future in fs]


Expand Down
1 change: 1 addition & 0 deletions simpleflow/local_async/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .executor import Executor
144 changes: 144 additions & 0 deletions simpleflow/local_async/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging

import multiprocessing
from simpleflow import (
executor,
futures
)
from concurrent import futures as py_futures


logger = logging.getLogger(__name__)


class AdaptorFuture(futures.Future):
"""A wrapped future object that fills (some of) the semantic gap between
`simpleflow.futures.Future` and `concurrent.futures.Future`
"""
def __init__(self, py_future):
super(AdaptorFuture, self).__init__()
self.py_future = py_future

# TODO make this method in base class call self.state()
# def __repr__(self):
# return '<Future at %s state=%s>' % (
# hex(id(self)),
# _STATE_TO_DESCRIPTION_MAP[self._state])

@property
def result(self):
# will block if the task is not completed yet
return self.py_future.result()

def cancel(self):
raise NotImplementedError()

@property
def state(self):
if self.py_future.running():
return futures.RUNNING
if self.py_future.done():
return futures.FINISHED

return futures.PENDING

@property
def exception(self):
return self.py_future.exception()

@property
def cancelled(self):
# not supported
return False

@property
def running(self):
return self.py_future.running()

@property
def finished(self):
# without cancellation `finish` has the same semantic as `done`
return self.done

@property
def done(self):
return self.py_future.done()


def _get_actual_value(value):
if isinstance(value, AdaptorFuture):
return value.result
return value


class Executor(executor.Executor):
def __init__(self, workflow):
super(Executor, self).__init__(workflow)
# the real executor that does all the stuff
# FIXME cannot use ProcessPoolExecutor, error like:
# PicklingError: Can't pickle <type 'function'>:
# attribute lookup __builtin__.function failed
self._executor = py_futures.ThreadPoolExecutor(
multiprocessing.cpu_count())

def submit(self, func, *args, **kwargs):
logger.info('executing task {}(args={}, kwargs={})'.format(
func, args, kwargs))
args = [_get_actual_value(arg) for arg in args]
kwargs = {key: _get_actual_value(val) for
key, val in kwargs.iteritems()}

py_future = self._executor.submit(func._callable, *args, **kwargs)

# use the adaptor to wrap `concurrent.futures.Future`
return AdaptorFuture(py_future)

def run(self, input=None):
if input is None:
input = {}
args = input.get('args', ())
kwargs = input.get('kwargs', {})

return self.run_workflow(*args, **kwargs)


if __name__ == '__main__':
from simpleflow import activity, Workflow
import time

@activity.with_attributes(task_list='quickstart')
def side_affect():
time.sleep(10)
print 'hey!'

@activity.with_attributes(task_list='quickstart')
def increment(x):
time.sleep(5)
return x + 1

@activity.with_attributes(task_list='quickstart')
def double(x):
time.sleep(5)
return x * 2

class SimpleComputation(Workflow):
def run(self, x):
self.submit(side_affect)
y = self.submit(increment, x)
z = self.submit(double, y)
return z.result

before = time.time()
result = Executor(SimpleComputation).run({"args": [5], "kwargs": {}})
after = time.time()

# Output with:
# >>> 12
# >>> used 10.0062558651 seconds ...
# >>> hey!

# => async execution

print result
print 'used {} seconds ...'.format(after - before)