-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasynchronizer.py
More file actions
118 lines (92 loc) · 4.14 KB
/
asynchronizer.py
File metadata and controls
118 lines (92 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import concurrent.futures
import functools
import threading
from typing import Any, Callable, Optional, MutableSet, List, Tuple
from _asynchronizer_ext import Thread, IoContext, ErrorCategory, ErrorCode
__all__ = ['Thread', 'IoContext', 'ErrorCategory', 'ErrorCode', 'Asynchronizer']
class Asynchronizer:
def __init__(
self,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""
Initializes Asynchronizer object.
:param loop: Asyncio event loop
:type loop: Optional[asyncio.AbstractEventLoop]
"""
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._ready_queue = _ReadyQueue(loop=loop)
@property
def loop(self) -> asyncio.AbstractEventLoop:
return self._loop
def call_async(
self,
start_task: Callable[[Callable[..., None]], None],
*,
default: Any = None,
pack_single: bool = False,
) -> asyncio.Future:
"""
Wraps Boost.Asio async function for use with Python asyncio awaitable features.
Example 1:
// C++. Calls `on_ready(error_code)` on complete
void async_fn(
std::shared_ptr<boost::asio::io_context> io_context_ptr,
std::function<void (const boost::error_code &) on_ready
);
# Python. Calls `async_fn` with some arguments
async def another_fn():
io_context = make_io_context()
ec = await call_async(functools.partial(async_fn, io_context))
if ec.value == 0:
print('Completed successfully!')
Example 2:
// C++. Calls something like `on_ready(42, "Hello, world!")` on complete
void async_fn(std::function<void (int, std::string) on_ready);
# Python. Gets several parameters as return value
async def another_fn():
my_int, my_str = await call_async(async_fn)
print(f'my_int={my_int}, my_str={my_str}')
:param start_task: Function which should be called with some callback `on_ready(*results: Any)` as only argument
:type start_task: Callable[[Callable[[...], None]], None]
:param default: Default result which will be set as feature result
:type default: Any
:param pack_single: Force single result packing as tuple
:type pack_single: bool
:return: Feature associated with started asynchronous operation result
:rtype: asyncio.Feature
"""
future = self._loop.create_future()
start_task(self._ready_queue.prepare(future=future, default=default, pack_single=pack_single))
return future
class _ReadyQueue:
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self.loop = loop
self._lock = threading.Lock()
self._ready_callbacks: MutableSet[Callable[..., None]] = set()
self._ready: List[Tuple[concurrent.futures.Future, Any]] = []
self._set_results_job: Optional[concurrent.futures.Future] = None
def prepare(self, future: asyncio.Future, default: Any, pack_single: bool) -> Callable[..., None]:
on_ready = functools.partial(self._on_ready, future=future, default=default, pack_single=pack_single)
future.add_done_callback(lambda _future: self._ready_callbacks.discard(on_ready))
self._ready_callbacks.add(on_ready)
return on_ready
def _on_ready(self, *result: Any, future: asyncio.Future, default: Any, pack_single: bool) -> None:
if result:
if not pack_single:
result = result[0]
else:
result = default
with self._lock:
self._ready.append((future, result))
if self._set_results_job is None:
self._set_results_job = asyncio.run_coroutine_threadsafe(self._set_results(), loop=self.loop)
async def _set_results(self) -> None:
with self._lock:
for future, result in self._ready:
future.set_result(result)
self._ready.clear()
self._set_results_job = None