Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 558da42

Browse files
committed
concurrency: concurrent_find defaults to raising MultiException if all fail
1 parent 3153299 commit 558da42

File tree

1 file changed

+33
-16
lines changed

1 file changed

+33
-16
lines changed

easypy/concurrency.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
from easypy.units import MINUTE, HOUR
8888
from easypy.colors import colorize, uncolored
8989
from easypy.sync import SynchronizationCoordinator, ProcessExiting, raise_in_main_thread
90-
90+
from easypy.misc import kwargs_resilient
91+
from easypy.tokens import AUTO
9192

9293
MAX_THREAD_POOL_SIZE = int(os.environ.get('EASYPY_MAX_THREAD_POOL_SIZE', 50))
9394
DISABLE_CONCURRENCY = yesno_to_bool(os.getenv("EASYPY_DISABLE_CONCURRENCY", "no"))
@@ -307,12 +308,12 @@ def add_details(exc):
307308
return buff
308309

309310

310-
def _submit_execution(executor, func, args, kwargs, ctx, funcname=None):
311+
def _submit_execution(executor, func, args, kwargs, ctx, exception_handler=None, funcname=None):
311312
"""
312313
This helper takes care of submitting a function for asynchronous execution, while wrapping and storing
313314
useful information for tracing it in logs (for example, by ``Futures.dump_stacks``)
314315
"""
315-
future = executor.submit(_run_with_exception_logging, func, args, kwargs, ctx)
316+
future = executor.submit(_run_with_exception_logging, func, args, kwargs, ctx, exception_handler)
316317
future.ctx = ctx
317318
future.funcname = funcname or _get_func_name(func)
318319
return future
@@ -401,11 +402,11 @@ class PooledFutures(cls):
401402

402403
killed = False
403404

404-
def submit(self, func, *args, log_ctx={}, **kwargs):
405+
def submit(self, func, *args, log_ctx={}, exception_handler=None, **kwargs):
405406
"Submit a new asynchronous task to this executor"
406407

407408
_ctx = dict(ctx, **log_ctx)
408-
future = executor.submit(_run_with_exception_logging, func, args, kwargs, _ctx)
409+
future = executor.submit(_run_with_exception_logging, func, args, kwargs, _ctx, exception_handler)
409410
future.ctx = _ctx
410411
future.funcname = _get_func_name(func)
411412
self.append(future)
@@ -515,7 +516,7 @@ def logged_wait(self, timeout=None, initial_log_interval=2 * MINUTE):
515516
self.dump_stacks(pending, verbose=global_timer.elapsed >= HOUR)
516517

517518

518-
def _run_with_exception_logging(func, args, kwargs, ctx):
519+
def _run_with_exception_logging(func, args, kwargs, ctx, exception_handler):
519520
"""
520521
Use as a wrapper for functions that run asynchronously, setting up a logging context and
521522
recording the thread in-which they are running, so that we can later log their progress
@@ -534,9 +535,12 @@ def _run_with_exception_logging(func, args, kwargs, ctx):
534535
_logger.debug(exc)
535536
raise
536537
except Exception as exc:
537-
_logger.silent_exception(
538-
"Exception (%s) in thread running %s (traceback in debug logs)",
539-
exc.__class__.__qualname__, func)
538+
if not exception_handler:
539+
_logger.silent_exception(
540+
"Exception (%s) in thread running %s (traceback in debug logs)",
541+
exc.__class__.__qualname__, func)
542+
else:
543+
kwargs_resilient(exception_handler)(exc=exc, func=func)
540544
try:
541545
exc.timestamp = time.time()
542546
except: # noqa - sometimes exception objects are immutable
@@ -625,19 +629,32 @@ def asynchronous(func, params=None, workers=None, log_contexts=None, final_timeo
625629
yield futures
626630

627631

628-
def concurrent_find(func, params, **kw):
632+
def concurrent_find(func, params, concurrent_timeout=None, raise_all=True, exception_handler=AUTO, **kw):
629633
assert not DISABLE_CONCURRENCY, "concurrent_find runs only with concurrency enabled"
630-
timeout = kw.pop("concurrent_timeout", None)
631-
with asynchronous(func, list(params), **kw) as futures:
634+
timeout = concurrent_timeout
635+
636+
if exception_handler is AUTO:
637+
def exception_handler(exc, func):
638+
_logger.debug("Exception (%s) in thread running %s", exc.__class__.__qualname__, func, exc_info=True)
639+
640+
with asynchronous(func, list(params), exception_handler=exception_handler, **kw) as futures:
632641
future = None
633642
try:
634643
for future in futures.as_completed(timeout=timeout):
635644
if not future.exception() and future.result():
636645
futures.kill()
637646
return future.result()
638647
else:
639-
if future:
640-
return future.result()
648+
# we get here if we timed out or all futures raised an exception
649+
if not future:
650+
# must be a timeout
651+
return None
652+
elif raise_all:
653+
# will raise a MultiException
654+
futures.result()
655+
else:
656+
# raise from the last future that completed (backwards compatibility)
657+
future.result()
641658
except FutureTimeoutError as exc:
642659
if not timeout:
643660
# ??
@@ -646,15 +663,15 @@ def concurrent_find(func, params, **kw):
646663
_logger.warning("Concurrent future timed out (%s)", exc)
647664

648665

649-
def nonconcurrent_map(func, params, log_contexts=None, **kw):
666+
def nonconcurrent_map(func, params, log_contexts=None, exception_handler=None, **kw):
650667
futures = Futures()
651668
log_contexts = _to_log_contexts(params, log_contexts)
652669
has_exceptions = False
653670
for args, ctx in zip(_to_args_list(params), log_contexts):
654671
future = Future()
655672
futures.append(future)
656673
try:
657-
result = _run_with_exception_logging(func, args, kw, ctx)
674+
result = _run_with_exception_logging(func, args, kw, ctx, exception_handler)
658675
except Exception as exc:
659676
has_exceptions = True
660677
future.set_exception(exc)

0 commit comments

Comments
 (0)