Skip to content

Commit f2434b7

Browse files
committed
Add a mostly actor aware API to IB backend
Infected `asyncio` support is being added to `tractor` in goodboy/tractor#121 so delegate to all that new machinery. Start building out an "actor-aware" api which takes care of all the `trio`-`asyncio` interaction for data streaming and request handling. Add a little (shudder) method proxy system which can be used to invoke client methods from another actor. Start on a streaming api in preparation for real-time charting.
1 parent e625129 commit f2434b7

File tree

1 file changed

+209
-93
lines changed

1 file changed

+209
-93
lines changed

piker/brokers/ib.py

Lines changed: 209 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,29 @@
11
"""
22
Interactive Brokers API backend.
3+
4+
Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
5+
built on it) and thus actor aware apis must be spawned with
6+
``infected_aio==True``.
37
"""
48
import asyncio
59
from dataclasses import asdict
6-
from typing import List, Dict, Any
10+
from functools import partial
11+
import inspect
12+
from typing import List, Dict, Any, Tuple
713
from contextlib import asynccontextmanager
14+
import time
815

9-
import trio
16+
import tractor
17+
from async_generator import aclosing
1018
import ib_insync as ibis
1119
from ib_insync.ticker import Ticker
1220
from ib_insync.contract import Contract, ContractDetails
1321

22+
from ..log import get_logger, get_console_log
23+
24+
25+
log = get_logger(__name__)
26+
1427

1528
_time_frames = {
1629
'1s': '1 Sec',
@@ -35,14 +48,14 @@
3548

3649
class Client:
3750
"""IB wrapped for our broker backend API.
51+
52+
Note: this client requires running inside an ``asyncio`` loop.
3853
"""
3954
def __init__(
4055
self,
4156
ib: ibis.IB,
4257
) -> None:
4358
self.ib = ib
44-
# connect data feed callback...
45-
self.ib.pendingTickersEvent.connect(self.on_tickers)
4659

4760
async def bars(
4861
self,
@@ -57,7 +70,7 @@ async def bars(
5770
"""
5871
contract = ibis.ContFuture('ES', exchange='GLOBEX')
5972
# contract = ibis.Stock('WEED', 'SMART', 'CAD')
60-
bars = self.ib.reqHistoricalData(
73+
bars = await self.ib.reqHistoricalDataAsync(
6174
contract,
6275
endDateTime='',
6376
# durationStr='60 S',
@@ -88,16 +101,25 @@ async def search_stocks(
88101
89102
Return a dictionary of ``upto`` entries worth of contract details.
90103
"""
91-
descriptions = self.ib.reqMatchingSymbols(pattern)
104+
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
105+
106+
futs = []
107+
for d in descriptions:
108+
con = d.contract
109+
futs.append(self.ib.reqContractDetailsAsync(con))
110+
111+
# batch request all details
112+
results = await asyncio.gather(*futs)
113+
114+
# XXX: if there is more then one entry in the details list
92115
details = {}
93-
for description in descriptions:
94-
con = description.contract
95-
deats = self.ib.reqContractDetails(con)
96-
# XXX: if there is more then one entry in the details list
116+
for details_set in results:
97117
# then the contract is so called "ambiguous".
98-
for d in deats:
118+
for d in details_set:
119+
con = d.contract
99120
unique_sym = f'{con.symbol}.{con.primaryExchange}'
100121
details[unique_sym] = asdict(d) if asdicts else d
122+
101123
if len(details) == upto:
102124
return details
103125

@@ -118,23 +140,38 @@ def get_cont_fute(
118140
) -> Contract:
119141
raise NotImplementedError
120142

143+
async def stream_ticker(
144+
self,
145+
symbol: str,
146+
to_trio,
147+
opts: Tuple[int] = ('233', '375'),
148+
) -> None:
149+
"""Stream a ticker using the std L1 api.
150+
"""
151+
sym, exch = symbol.split('.')
152+
contract = ibis.Stock(sym.upper(), exchange=exch.upper())
153+
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
154+
ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t))
155+
156+
# let the engine run and stream
157+
await self.ib.disconnectedEvent
158+
121159

122160
# default config ports
123161
_tws_port: int = 7497
124162
_gw_port: int = 4002
125163

126164

127165
@asynccontextmanager
128-
async def get_client(
166+
async def _aio_get_client(
129167
host: str = '127.0.0.1',
130168
port: int = None,
131169
client_id: int = 1,
132170
) -> Client:
133171
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
134172
"""
135173
ib = ibis.IB()
136-
# TODO: some detection magic to figure out if tws vs. the
137-
# gateway is up ad choose the appropriate port
174+
138175
if port is None:
139176
ports = [_tws_port, _gw_port]
140177
else:
@@ -152,91 +189,170 @@ async def get_client(
152189
else:
153190
raise ConnectionRefusedError(_err)
154191

155-
yield Client(ib)
156-
ib.disconnect()
157-
192+
try:
193+
yield Client(ib)
194+
except BaseException:
195+
ib.disconnect()
196+
raise
197+
198+
199+
async def _aio_run_client_method(
200+
meth: str,
201+
to_trio,
202+
from_trio,
203+
**kwargs,
204+
) -> None:
205+
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
206+
async with _aio_get_client() as client:
207+
208+
async_meth = getattr(client, meth)
209+
210+
# handle streaming methods
211+
args = tuple(inspect.getfullargspec(async_meth).args)
212+
if 'to_trio' in args:
213+
kwargs['to_trio'] = to_trio
214+
215+
return await async_meth(**kwargs)
216+
217+
218+
async def _trio_run_client_method(
219+
method: str,
220+
**kwargs,
221+
) -> None:
222+
ca = tractor.current_actor()
223+
assert ca.is_infected_aio()
224+
225+
# if the method is an async gen stream for it
226+
meth = getattr(Client, method)
227+
if inspect.isasyncgenfunction(meth):
228+
kwargs['_treat_as_stream'] = True
229+
230+
# if the method is an async func but streams back results
231+
# make sure to also stream from it
232+
args = tuple(inspect.getfullargspec(meth).args)
233+
if 'to_trio' in args:
234+
kwargs['_treat_as_stream'] = True
235+
236+
result = await tractor.to_asyncio.run_task(
237+
_aio_run_client_method,
238+
meth=method,
239+
**kwargs
240+
)
241+
return result
242+
243+
244+
def get_method_proxy(portal):
245+
246+
class MethodProxy:
247+
def __init__(self, portal: tractor._portal.Portal):
248+
self._portal = portal
249+
250+
async def _run_method(
251+
self,
252+
*,
253+
meth: str = None,
254+
**kwargs
255+
) -> Any:
256+
return await self._portal.run(
257+
__name__,
258+
'_trio_run_client_method',
259+
method=meth,
260+
**kwargs
261+
)
262+
263+
proxy = MethodProxy(portal)
264+
265+
# mock all remote methods
266+
for name, method in inspect.getmembers(
267+
Client, predicate=inspect.isfunction
268+
):
269+
if '_' == name[0]:
270+
continue
271+
setattr(proxy, name, partial(proxy._run_method, meth=name))
272+
273+
return proxy
158274

159-
if __name__ == '__main__':
160275

161-
con_es = ibis.ContFuture('ES', exchange='GLOBEX')
162-
es = ibis.Future('ES', '20200918', exchange='GLOBEX')
163-
spy = ibis.Stock('SPY', exchange='ARCA')
164-
165-
# ticker = client.ib.reqTickByTickData(
166-
# contract,
167-
# tickType='Last',
168-
# numberOfTicks=1,
169-
# )
170-
# client.ib.reqTickByTickData(
171-
# contract,
172-
# tickType='AllLast',
173-
# numberOfTicks=1,
174-
# )
175-
# client.ib.reqTickByTickData(
176-
# contract,
177-
# tickType='BidAsk',
178-
# numberOfTicks=1,
179-
# )
180-
181-
# ITC (inter task comms)
182-
from_trio = asyncio.Queue()
183-
to_trio, from_aio = trio.open_memory_channel(float("inf"))
184-
185-
async def start_ib(from_trio, to_trio):
186-
print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!")
187-
async with get_client() as client:
188-
189-
# stream ticks to trio task
190-
def ontick(ticker: Ticker):
191-
for t in ticker.ticks:
192-
# send tick data to trio
193-
to_trio.send_nowait(t)
194-
195-
ticker = client.ib.reqMktData(spy, '588', False, False, None)
196-
ticker.updateEvent += ontick
197-
198-
n = await from_trio.get()
199-
assert n == 0
200-
201-
# sleep and let the engine run
202-
await asyncio.sleep(float('inf'))
203-
204-
# TODO: cmd processing from trio
205-
# while True:
206-
# n = await from_trio.get()
207-
# print(f"aio got: {n}")
208-
# to_trio.send_nowait(n + 1)
209-
210-
async def trio_main():
211-
print("trio_main!")
212-
213-
asyncio.create_task(
214-
start_ib(from_trio, to_trio)
215-
)
276+
@asynccontextmanager
277+
async def maybe_spawn_brokerd(
278+
**kwargs,
279+
) -> tractor._portal.Portal:
280+
async with tractor.find_actor('brokerd_ib') as portal:
281+
if portal is None: # no broker daemon created yet
282+
283+
async with tractor.open_nursery() as n:
284+
# XXX: this needs to somehow be hidden
285+
portal = await n.start_actor(
286+
'brokerd_ib',
287+
rpc_module_paths=[__name__],
288+
infect_asyncio=True,
289+
)
290+
async with tractor.wait_for_actor(
291+
'brokerd_ib'
292+
) as portal:
293+
yield portal
294+
295+
# client code may block indefinitely so cancel when
296+
# teardown is invoked
297+
await n.cancel()
216298

217-
from_trio.put_nowait(0)
218299

219-
async for tick in from_aio:
300+
@asynccontextmanager
301+
async def get_client(
302+
**kwargs,
303+
) -> Client:
304+
"""Init the ``ib_insync`` client in another actor and return
305+
a method proxy to it.
306+
"""
307+
async with maybe_spawn_brokerd(**kwargs) as portal:
308+
yield get_method_proxy(portal)
309+
310+
311+
async def trio_stream_ticker(sym):
312+
get_console_log('info')
313+
314+
# con_es = ibis.ContFuture('ES', exchange='GLOBEX')
315+
# es = ibis.Future('ES', '20200918', exchange='GLOBEX')
316+
317+
stream = await tractor.to_asyncio.run_task(
318+
_trio_run_client_method,
319+
method='stream_ticker',
320+
symbol=sym,
321+
)
322+
async with aclosing(stream):
323+
async for ticker in stream:
324+
lft = ticker.lastFillTime
325+
for tick_data in ticker.ticks:
326+
value = tick_data._asdict()
327+
now = time.time()
328+
value['time'] = now
329+
value['last_fill_time'] = lft
330+
if lft:
331+
value['latency'] = now - lft
332+
yield value
333+
334+
335+
async def stream_from_brokerd(sym):
336+
337+
async with maybe_spawn_brokerd() as portal:
338+
stream = await portal.run(
339+
__name__,
340+
'trio_stream_ticker',
341+
sym=sym,
342+
)
343+
async for tick in stream:
220344
print(f"trio got: {tick}")
221345

222-
# TODO: send cmds to asyncio
223-
# from_trio.put_nowait(n + 1)
224-
225-
async def aio_main():
226-
loop = asyncio.get_running_loop()
227-
228-
trio_done_fut = asyncio.Future()
229346

230-
def trio_done_callback(main_outcome):
231-
print(f"trio_main finished: {main_outcome!r}")
232-
trio_done_fut.set_result(main_outcome)
233-
234-
trio.lowlevel.start_guest_run(
235-
trio_main,
236-
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
237-
done_callback=trio_done_callback,
238-
)
347+
if __name__ == '__main__':
348+
import sys
239349

240-
(await trio_done_fut).unwrap()
350+
sym = sys.argv[1]
241351

242-
asyncio.run(aio_main())
352+
tractor.run(
353+
stream_from_brokerd,
354+
sym,
355+
# XXX: must be multiprocessing
356+
start_method='forkserver',
357+
loglevel='info'
358+
)

0 commit comments

Comments
 (0)