Skip to content

Commit 58167fe

Browse files
committed
1 parent 4fb9d0f commit 58167fe

File tree

3 files changed

+83
-16
lines changed

3 files changed

+83
-16
lines changed

fastcore/_modidx.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,12 @@
437437
'fastcore/parallel.py'),
438438
'fastcore.parallel.ThreadPoolExecutor.map': ( 'parallel.html#threadpoolexecutor.map',
439439
'fastcore/parallel.py'),
440+
'fastcore.parallel._add_one': ('parallel.html#_add_one', 'fastcore/parallel.py'),
440441
'fastcore.parallel._call': ('parallel.html#_call', 'fastcore/parallel.py'),
441442
'fastcore.parallel._done_pg': ('parallel.html#_done_pg', 'fastcore/parallel.py'),
442443
'fastcore.parallel._f_pg': ('parallel.html#_f_pg', 'fastcore/parallel.py'),
443-
'fastcore.parallel.add_one': ('parallel.html#add_one', 'fastcore/parallel.py'),
444444
'fastcore.parallel.parallel': ('parallel.html#parallel', 'fastcore/parallel.py'),
445+
'fastcore.parallel.parallel_async': ('parallel.html#parallel_async', 'fastcore/parallel.py'),
445446
'fastcore.parallel.parallel_gen': ('parallel.html#parallel_gen', 'fastcore/parallel.py'),
446447
'fastcore.parallel.parallelable': ('parallel.html#parallelable', 'fastcore/parallel.py'),
447448
'fastcore.parallel.run_procs': ('parallel.html#run_procs', 'fastcore/parallel.py'),

fastcore/parallel.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# %% auto 0
66
__all__ = ['threaded', 'startthread', 'startproc', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel',
7-
'add_one', 'run_procs', 'parallel_gen']
7+
'parallel_async', 'run_procs', 'parallel_gen']
88

99
# %% ../nbs/03a_parallel.ipynb
1010
from .imports import *
@@ -134,12 +134,30 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
134134
return L(r)
135135

136136
# %% ../nbs/03a_parallel.ipynb
137-
def add_one(x, a=1):
137+
def _add_one(x, a=1):
138138
# this import is necessary for multiprocessing in notebook on windows
139139
import random
140140
time.sleep(random.random()/80)
141141
return x+a
142142

143+
# %% ../nbs/03a_parallel.ipynb
144+
async def parallel_async(f, items, *args, n_workers=16, total=None,
145+
timeout=None, chunksize=1, on_exc=print, **kwargs):
146+
"Applies `f` to `items` in parallel using asyncio and a semaphore to limit concurrency."
147+
import asyncio
148+
if n_workers is None: n_workers = defaults.cpus
149+
semaphore = asyncio.Semaphore(n_workers)
150+
results = []
151+
152+
async def limited_task(item):
153+
coro = f(item, *args, **kwargs) if asyncio.iscoroutinefunction(f) else asyncio.to_thread(f, item, *args, **kwargs)
154+
async with semaphore:
155+
return await asyncio.wait_for(coro, timeout) if timeout else await coro
156+
157+
tasks = [limited_task(item) for item in items]
158+
if total is None: total = len(items)
159+
return asyncio.gather(*tasks)
160+
143161
# %% ../nbs/03a_parallel.ipynb
144162
def run_procs(f, f_done, args):
145163
"Call `f` for each item in `args` in parallel, yielding `f_done`"

nbs/03a_parallel.ipynb

+61-13
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@
363363
"\n",
364364
"> ProcessPoolExecutor (max_workers=8, on_exc=<built-in function print>,\n",
365365
"> pause=0, mp_context=None, initializer=None,\n",
366-
"> initargs=())\n",
366+
"> initargs=(), max_tasks_per_child=None)\n",
367367
"\n",
368368
"*Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution*"
369369
],
@@ -376,7 +376,7 @@
376376
"\n",
377377
"> ProcessPoolExecutor (max_workers=8, on_exc=<built-in function print>,\n",
378378
"> pause=0, mp_context=None, initializer=None,\n",
379-
"> initargs=())\n",
379+
"> initargs=(), max_tasks_per_child=None)\n",
380380
"\n",
381381
"*Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution*"
382382
]
@@ -432,7 +432,7 @@
432432
"outputs": [],
433433
"source": [
434434
"#|export\n",
435-
"def add_one(x, a=1):\n",
435+
"def _add_one(x, a=1):\n",
436436
" # this import is necessary for multiprocessing in notebook on windows\n",
437437
" import random\n",
438438
" time.sleep(random.random()/80)\n",
@@ -447,11 +447,11 @@
447447
"source": [
448448
"inp,exp = range(50),range(1,51)\n",
449449
"\n",
450-
"test_eq(parallel(add_one, inp, n_workers=2), exp)\n",
451-
"test_eq(parallel(add_one, inp, threadpool=True, n_workers=2), exp)\n",
452-
"test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))\n",
453-
"test_eq(parallel(add_one, inp, n_workers=0), exp)\n",
454-
"test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))"
450+
"test_eq(parallel(_add_one, inp, n_workers=2), exp)\n",
451+
"test_eq(parallel(_add_one, inp, threadpool=True, n_workers=2), exp)\n",
452+
"test_eq(parallel(_add_one, inp, n_workers=1, a=2), range(2,52))\n",
453+
"test_eq(parallel(_add_one, inp, n_workers=0), exp)\n",
454+
"test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52))"
455455
]
456456
},
457457
{
@@ -479,11 +479,11 @@
479479
"name": "stdout",
480480
"output_type": "stream",
481481
"text": [
482-
"0 2024-10-09 16:08:39.462154\n",
483-
"1 2024-10-09 16:08:39.715074\n",
484-
"2 2024-10-09 16:08:39.969191\n",
485-
"3 2024-10-09 16:08:40.221442\n",
486-
"4 2024-10-09 16:08:40.473224\n"
482+
"0 2024-10-12 13:30:21.217649\n",
483+
"1 2024-10-12 13:30:21.469191\n",
484+
"2 2024-10-12 13:30:21.721034\n",
485+
"3 2024-10-12 13:30:21.972793\n",
486+
"4 2024-10-12 13:30:22.223159\n"
487487
]
488488
}
489489
],
@@ -520,6 +520,54 @@
520520
"parallel(die_sometimes, range(8))"
521521
]
522522
},
523+
{
524+
"cell_type": "code",
525+
"execution_count": null,
526+
"metadata": {},
527+
"outputs": [],
528+
"source": [
529+
"#|export\n",
530+
"async def parallel_async(f, items, *args, n_workers=16, total=None,\n",
531+
" timeout=None, chunksize=1, on_exc=print, **kwargs):\n",
532+
" \"Applies `f` to `items` in parallel using asyncio and a semaphore to limit concurrency.\"\n",
533+
" import asyncio\n",
534+
" if n_workers is None: n_workers = defaults.cpus\n",
535+
" semaphore = asyncio.Semaphore(n_workers)\n",
536+
" results = []\n",
537+
"\n",
538+
" async def limited_task(item):\n",
539+
" coro = f(item, *args, **kwargs) if asyncio.iscoroutinefunction(f) else asyncio.to_thread(f, item, *args, **kwargs)\n",
540+
" async with semaphore:\n",
541+
" return await asyncio.wait_for(coro, timeout) if timeout else await coro\n",
542+
"\n",
543+
" tasks = [limited_task(item) for item in items]\n",
544+
" if total is None: total = len(items)\n",
545+
" return asyncio.gather(*tasks)"
546+
]
547+
},
548+
{
549+
"cell_type": "code",
550+
"execution_count": null,
551+
"metadata": {},
552+
"outputs": [],
553+
"source": [
554+
"import asyncio"
555+
]
556+
},
557+
{
558+
"cell_type": "code",
559+
"execution_count": null,
560+
"metadata": {},
561+
"outputs": [],
562+
"source": [
563+
"async def print_time_async(i): \n",
564+
" wait = random.random()\n",
565+
" await asyncio.sleep(wait)\n",
566+
" print(i, datetime.now(), wait)\n",
567+
"\n",
568+
"await parallel_async(print_time_async, range(6), n_workers=3);"
569+
]
570+
},
523571
{
524572
"cell_type": "code",
525573
"execution_count": null,

0 commit comments

Comments
 (0)