Skip to content

Commit cea0b0b

Browse files
committed
Support asyncio actors with the trio spawner backend
1 parent 765541e commit cea0b0b

File tree

3 files changed

+38
-17
lines changed

3 files changed

+38
-17
lines changed

tractor/_child.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,8 @@
22
import trio
33
import cloudpickle
44

5+
from ._entry import _trio_main
6+
57
if __name__ == "__main__":
6-
trio.run(cloudpickle.load(sys.stdin.buffer))
8+
subactor, async_fn, kwargs = cloudpickle.load(sys.stdin.buffer)
9+
_trio_main(subactor, async_fn, **kwargs)

tractor/_entry.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Process entry points.
33
"""
44
from functools import partial
5-
from typing import Tuple, Any
5+
from typing import Tuple, Any, Callable
66

77
import trio # type: ignore
88

@@ -21,6 +21,7 @@ def _mp_main(
2121
forkserver_info: Tuple[Any, Any, Any, Any, Any],
2222
start_method: str,
2323
parent_addr: Tuple[str, int] = None,
24+
infect_asyncio: bool = False,
2425
) -> None:
2526
"""The routine called *after fork* which invokes a fresh ``trio.run``
2627
"""
@@ -56,24 +57,30 @@ def _mp_main(
5657
log.info(f"Actor {actor.uid} terminated")
5758

5859

59-
async def _trio_main(
60+
def _trio_main(
6061
actor: 'Actor',
61-
accept_addr: Tuple[str, int],
62-
parent_addr: Tuple[str, int] = None
62+
trio_main: Callable,
63+
*,
64+
infect_asyncio: bool = False,
6365
) -> None:
6466
"""Entry point for a `trio_run_in_process` subactor.
6567
6668
Here we don't need to call `trio.run()` since trip does that as
6769
part of its subprocess startup sequence.
6870
"""
71+
log.info(f"Started new trio process for {actor.uid}")
72+
6973
if actor.loglevel is not None:
7074
log.info(
7175
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
7276
get_console_log(actor.loglevel)
7377

74-
log.info(f"Started new trio process for {actor.uid}")
75-
7678
_state._current_actor = actor
7779

78-
await actor._async_main(accept_addr, parent_addr=parent_addr)
80+
if infect_asyncio:
81+
actor._infected_aio = True
82+
run_as_asyncio_guest(trio_main)
83+
else:
84+
trio.run(trio_main)
85+
7986
log.info(f"Actor {actor.uid} terminated")

tractor/_spawn.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import subprocess
77
import multiprocessing as mp
88
import platform
9-
from typing import Any, Dict, Optional
9+
from typing import Any, Dict, Optional, Callable
1010
from functools import partial
1111

1212
import trio
@@ -30,7 +30,7 @@
3030
from .log import get_logger
3131
from ._portal import Portal
3232
from ._actor import Actor, ActorFailure
33-
from ._entry import _mp_main, _trio_main
33+
from ._entry import _mp_main
3434

3535

3636
log = get_logger('tractor')
@@ -158,8 +158,14 @@ async def cancel_on_completion(
158158

159159

160160
@asynccontextmanager
161-
async def run_in_process(subactor, async_fn, *args, **kwargs):
162-
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
161+
async def run_in_process(
162+
subactor: 'Actor',
163+
async_fn: Callable,
164+
**kwargs
165+
):
166+
encoded_blob = cloudpickle.dumps(
167+
(subactor, async_fn, kwargs,)
168+
)
163169

164170
async with await trio.open_process(
165171
[
@@ -176,7 +182,7 @@ async def run_in_process(subactor, async_fn, *args, **kwargs):
176182
) as proc:
177183

178184
# send func object to call in child
179-
await proc.stdin.send_all(encoded_job)
185+
await proc.stdin.send_all(encoded_blob)
180186
yield proc
181187

182188

@@ -189,6 +195,7 @@ async def new_proc(
189195
bind_addr: Tuple[str, int],
190196
parent_addr: Tuple[str, int],
191197
use_trio_run_in_process: bool = False,
198+
infect_asyncio: bool = False,
192199
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
193200
) -> None:
194201
"""Create a new ``multiprocessing.Process`` using the
@@ -203,10 +210,13 @@ async def new_proc(
203210
if use_trio_run_in_process or _spawn_method == 'trio':
204211
async with run_in_process(
205212
subactor,
206-
_trio_main,
207-
subactor,
208-
bind_addr,
209-
parent_addr,
213+
partial(
214+
subactor._async_main,
215+
accept_addr=bind_addr,
216+
parent_addr=parent_addr,
217+
),
218+
# kwargs bundeled as last pickled tuple element
219+
infect_asyncio=infect_asyncio
210220
) as proc:
211221
log.info(f"Started {proc}")
212222

@@ -274,6 +284,7 @@ async def new_proc(
274284
fs_info,
275285
start_method,
276286
parent_addr,
287+
infect_asyncio,
277288
),
278289
# daemon=True,
279290
name=name,

0 commit comments

Comments
 (0)