Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit a756023

Browse files
committed
concurrency: concurrent_find defaults to raising MultiException if all fail
1 parent 3153299 commit a756023

File tree

2 files changed

+57
-16
lines changed

2 files changed

+57
-16
lines changed

easypy/concurrency.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
from easypy.units import MINUTE, HOUR
8888
from easypy.colors import colorize, uncolored
8989
from easypy.sync import SynchronizationCoordinator, ProcessExiting, raise_in_main_thread
90-
90+
from easypy.misc import kwargs_resilient
91+
from easypy.tokens import AUTO
9192

9293
MAX_THREAD_POOL_SIZE = int(os.environ.get('EASYPY_MAX_THREAD_POOL_SIZE', 50))
9394
DISABLE_CONCURRENCY = yesno_to_bool(os.getenv("EASYPY_DISABLE_CONCURRENCY", "no"))
@@ -307,12 +308,12 @@ def add_details(exc):
307308
return buff
308309

309310

310-
def _submit_execution(executor, func, args, kwargs, ctx, funcname=None):
311+
def _submit_execution(executor, func, args, kwargs, ctx, exception_handler=None, funcname=None):
311312
"""
312313
This helper takes care of submitting a function for asynchronous execution, while wrapping and storing
313314
useful information for tracing it in logs (for example, by ``Futures.dump_stacks``)
314315
"""
315-
future = executor.submit(_run_with_exception_logging, func, args, kwargs, ctx)
316+
future = executor.submit(_run_with_exception_logging, func, args, kwargs, ctx, exception_handler)
316317
future.ctx = ctx
317318
future.funcname = funcname or _get_func_name(func)
318319
return future
@@ -401,11 +402,11 @@ class PooledFutures(cls):
401402

402403
killed = False
403404

404-
def submit(self, func, *args, log_ctx={}, **kwargs):
405+
def submit(self, func, *args, log_ctx={}, exception_handler=None, **kwargs):
405406
"Submit a new asynchronous task to this executor"
406407

407408
_ctx = dict(ctx, **log_ctx)
408-
future = executor.submit(_run_with_exception_logging, func, args, kwargs, _ctx)
409+
future = executor.submit(_run_with_exception_logging, func, args, kwargs, _ctx, exception_handler)
409410
future.ctx = _ctx
410411
future.funcname = _get_func_name(func)
411412
self.append(future)
@@ -515,7 +516,7 @@ def logged_wait(self, timeout=None, initial_log_interval=2 * MINUTE):
515516
self.dump_stacks(pending, verbose=global_timer.elapsed >= HOUR)
516517

517518

518-
def _run_with_exception_logging(func, args, kwargs, ctx):
519+
def _run_with_exception_logging(func, args, kwargs, ctx, exception_handler):
519520
"""
520521
Use as a wrapper for functions that run asynchronously, setting up a logging context and
521522
recording the thread in-which they are running, so that we can later log their progress
@@ -534,9 +535,12 @@ def _run_with_exception_logging(func, args, kwargs, ctx):
534535
_logger.debug(exc)
535536
raise
536537
except Exception as exc:
537-
_logger.silent_exception(
538-
"Exception (%s) in thread running %s (traceback in debug logs)",
539-
exc.__class__.__qualname__, func)
538+
if not exception_handler:
539+
_logger.silent_exception(
540+
"Exception (%s) in thread running %s (traceback in debug logs)",
541+
exc.__class__.__qualname__, func)
542+
else:
543+
kwargs_resilient(exception_handler)(exc=exc, func=func)
540544
try:
541545
exc.timestamp = time.time()
542546
except: # noqa - sometimes exception objects are immutable
@@ -625,19 +629,39 @@ def asynchronous(func, params=None, workers=None, log_contexts=None, final_timeo
625629
yield futures
626630

627631

628-
def concurrent_find(func, params, **kw):
632+
def concurrent_find(func, params, concurrent_timeout=None, raise_all=True, exception_handler=AUTO, **kw):
629633
assert not DISABLE_CONCURRENCY, "concurrent_find runs only with concurrency enabled"
630-
timeout = kw.pop("concurrent_timeout", None)
631-
with asynchronous(func, list(params), **kw) as futures:
634+
timeout = concurrent_timeout
635+
636+
if exception_handler is AUTO:
637+
def exception_handler(exc, func):
638+
_logger.debug("Exception (%s) in thread running %s", exc.__class__.__qualname__, func, exc_info=True)
639+
640+
with asynchronous(func, list(params), exception_handler=exception_handler, **kw) as futures:
632641
future = None
642+
any_result = None
633643
try:
634644
for future in futures.as_completed(timeout=timeout):
635-
if not future.exception() and future.result():
645+
if future.exception():
646+
continue
647+
any_result = True
648+
if future.result():
636649
futures.kill()
637650
return future.result()
638651
else:
639-
if future:
652+
# we get here if we timed out or all futures raised an exception
653+
if not future:
654+
# must be a timeout
655+
return None
656+
elif any_result:
657+
# return from the last future that completed
640658
return future.result()
659+
elif raise_all:
660+
# will raise a MultiException
661+
futures.result()
662+
else:
663+
# will raise from the last future that completed
664+
future.result()
641665
except FutureTimeoutError as exc:
642666
if not timeout:
643667
# ??
@@ -646,15 +670,15 @@ def concurrent_find(func, params, **kw):
646670
_logger.warning("Concurrent future timed out (%s)", exc)
647671

648672

649-
def nonconcurrent_map(func, params, log_contexts=None, **kw):
673+
def nonconcurrent_map(func, params, log_contexts=None, exception_handler=None, **kw):
650674
futures = Futures()
651675
log_contexts = _to_log_contexts(params, log_contexts)
652676
has_exceptions = False
653677
for args, ctx in zip(_to_args_list(params), log_contexts):
654678
future = Future()
655679
futures.append(future)
656680
try:
657-
result = _run_with_exception_logging(func, args, kw, ctx)
681+
result = _run_with_exception_logging(func, args, kw, ctx, exception_handler)
658682
except Exception as exc:
659683
has_exceptions = True
660684
future.set_exception(exc)

tests/test_concurrency.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,23 @@ def test_multiobject_concurrent_find_proper_shutdown():
202202
assert max(executed) <= 2
203203

204204

205+
def test_multiobject_concurrent_find_exceptions_logged():
206+
m = MultiObject("123ab")
207+
208+
# we'll mock the logger so we can ensure it logged
209+
with patch("easypy.concurrency._logger") as _logger:
210+
ret = m.concurrent_find(int)
211+
212+
assert ret in (1, 2, 3)
213+
assert _logger.debug.call_count == 2
214+
215+
exceptions = []
216+
ret = m.concurrent_find(int, exception_handler=lambda exc: exceptions.append(exc))
217+
218+
assert ret in (1, 2, 3)
219+
assert all(isinstance(exc, ValueError) for exc in exceptions)
220+
221+
205222
def test_multiobject_zip_with():
206223
m = MultiObject(range(4))
207224

0 commit comments

Comments
 (0)