File tree Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Original file line number Diff line number Diff line change @@ -220,24 +220,27 @@ def process_loop_asynchronous(
220
220
self ,
221
221
requests_queue : multiprocessing .Queue ,
222
222
results_queue : multiprocessing .Queue ,
223
- max_concurrency : Optional [ int ] ,
223
+ max_concurrency : int ,
224
224
process_id : int ,
225
225
):
226
226
async def _process_runner ():
227
- pending = asyncio .Semaphore (max_concurrency ) if max_concurrency else None
227
+ pending = asyncio .Semaphore (max_concurrency )
228
+
229
+ if pending .locked ():
230
+ raise ValueError (
231
+ "Async worker called with max_concurrency < 1"
232
+ )
228
233
229
234
while (
230
235
process_request := await self .get_request (requests_queue )
231
236
) is not None :
232
237
dequeued_time = time .time ()
233
238
234
- if pending :
235
- await pending .acquire ()
239
+ await pending .acquire ()
236
240
237
241
def _task_done (_ : asyncio .Task ):
238
242
nonlocal pending
239
- if pending :
240
- pending .release ()
243
+ pending .release ()
241
244
242
245
task = asyncio .create_task (
243
246
self .resolve_scheduler_request (
@@ -325,7 +328,7 @@ def process_loop_asynchronous(
325
328
self ,
326
329
requests_queue : multiprocessing .Queue ,
327
330
results_queue : multiprocessing .Queue ,
328
- max_concurrency : Optional [ int ] ,
331
+ max_concurrency : int ,
329
332
process_id : int ,
330
333
):
331
334
asyncio .run (self .backend .validate ())
You can’t perform that action at this time.
0 commit comments