Skip to content

Commit e625129

Browse files
committed
Add initial IB broker backend using ib_insync
Start working towards meeting the backend client api. Infect `asyncio` using `trio`'s new guest mode and demonstrate real-time ticker streaming to console.
1 parent fd5e336 commit e625129

File tree

1 file changed

+242
-0
lines changed

1 file changed

+242
-0
lines changed

piker/brokers/ib.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
"""
2+
Interactive Brokers API backend.
3+
"""
4+
import asyncio
5+
from dataclasses import asdict
6+
from typing import List, Dict, Any
7+
from contextlib import asynccontextmanager
8+
9+
import trio
10+
import ib_insync as ibis
11+
from ib_insync.ticker import Ticker
12+
from ib_insync.contract import Contract, ContractDetails
13+
14+
15+
_time_frames = {
16+
'1s': '1 Sec',
17+
'1m': 'OneMinute',
18+
'2m': 'TwoMinutes',
19+
'3m': 'ThreeMinutes',
20+
'4m': 'FourMinutes',
21+
'5m': 'FiveMinutes',
22+
'10m': 'TenMinutes',
23+
'15m': 'FifteenMinutes',
24+
'20m': 'TwentyMinutes',
25+
'30m': 'HalfHour',
26+
'1h': 'OneHour',
27+
'2h': 'TwoHours',
28+
'4h': 'FourHours',
29+
'D': 'OneDay',
30+
'W': 'OneWeek',
31+
'M': 'OneMonth',
32+
'Y': 'OneYear',
33+
}
34+
35+
36+
class Client:
37+
"""IB wrapped for our broker backend API.
38+
"""
39+
def __init__(
40+
self,
41+
ib: ibis.IB,
42+
) -> None:
43+
self.ib = ib
44+
# connect data feed callback...
45+
self.ib.pendingTickersEvent.connect(self.on_tickers)
46+
47+
async def bars(
48+
self,
49+
symbol: str,
50+
# EST in ISO 8601 format is required... below is EPOCH
51+
start_date: str = "1970-01-01T00:00:00.000000-05:00",
52+
time_frame: str = '1m',
53+
count: int = int(20e3), # <- max allowed per query
54+
is_paid_feed: bool = False,
55+
) -> List[Dict[str, Any]]:
56+
"""Retreive OHLCV bars for a symbol over a range to the present.
57+
"""
58+
contract = ibis.ContFuture('ES', exchange='GLOBEX')
59+
# contract = ibis.Stock('WEED', 'SMART', 'CAD')
60+
bars = self.ib.reqHistoricalData(
61+
contract,
62+
endDateTime='',
63+
# durationStr='60 S',
64+
durationStr='2000 S',
65+
barSizeSetting='1 secs',
66+
whatToShow='TRADES',
67+
useRTH=False
68+
)
69+
# barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True)
70+
# convert to pandas dataframe:
71+
df = ibis.util.df(bars)
72+
print(df[['date', 'open', 'high', 'low', 'close', 'volume']])
73+
from piker.ui._source import from_df
74+
a = from_df(df)
75+
# breakpoint()
76+
print(a)
77+
78+
# TODO: reimplement this using async batch requests
79+
# see https://github.com/erdewit/ib_insync/issues/262
80+
async def search_stocks(
81+
self,
82+
pattern: str,
83+
# how many contracts to search "up to"
84+
upto: int = 3,
85+
asdicts: bool = True,
86+
) -> Dict[str, ContractDetails]:
87+
"""Search for stocks matching provided ``str`` pattern.
88+
89+
Return a dictionary of ``upto`` entries worth of contract details.
90+
"""
91+
descriptions = self.ib.reqMatchingSymbols(pattern)
92+
details = {}
93+
for description in descriptions:
94+
con = description.contract
95+
deats = self.ib.reqContractDetails(con)
96+
# XXX: if there is more then one entry in the details list
97+
# then the contract is so called "ambiguous".
98+
for d in deats:
99+
unique_sym = f'{con.symbol}.{con.primaryExchange}'
100+
details[unique_sym] = asdict(d) if asdicts else d
101+
if len(details) == upto:
102+
return details
103+
104+
return details
105+
106+
async def search_futes(
107+
self,
108+
pattern: str,
109+
# how many contracts to search "up to"
110+
upto: int = 3,
111+
asdicts: bool = True,
112+
) -> Dict[str, ContractDetails]:
113+
raise NotImplementedError
114+
115+
def get_cont_fute(
116+
self,
117+
symbol: str,
118+
) -> Contract:
119+
raise NotImplementedError
120+
121+
122+
# default config ports
123+
_tws_port: int = 7497
124+
_gw_port: int = 4002
125+
126+
127+
@asynccontextmanager
128+
async def get_client(
129+
host: str = '127.0.0.1',
130+
port: int = None,
131+
client_id: int = 1,
132+
) -> Client:
133+
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
134+
"""
135+
ib = ibis.IB()
136+
# TODO: some detection magic to figure out if tws vs. the
137+
# gateway is up ad choose the appropriate port
138+
if port is None:
139+
ports = [_tws_port, _gw_port]
140+
else:
141+
ports = [port]
142+
143+
_err = None
144+
# try all default ports
145+
for port in ports:
146+
try:
147+
await ib.connectAsync(host, port, clientId=client_id)
148+
break
149+
except ConnectionRefusedError as ce:
150+
_err = ce
151+
print(f'failed to connect on {port}')
152+
else:
153+
raise ConnectionRefusedError(_err)
154+
155+
yield Client(ib)
156+
ib.disconnect()
157+
158+
159+
if __name__ == '__main__':
160+
161+
con_es = ibis.ContFuture('ES', exchange='GLOBEX')
162+
es = ibis.Future('ES', '20200918', exchange='GLOBEX')
163+
spy = ibis.Stock('SPY', exchange='ARCA')
164+
165+
# ticker = client.ib.reqTickByTickData(
166+
# contract,
167+
# tickType='Last',
168+
# numberOfTicks=1,
169+
# )
170+
# client.ib.reqTickByTickData(
171+
# contract,
172+
# tickType='AllLast',
173+
# numberOfTicks=1,
174+
# )
175+
# client.ib.reqTickByTickData(
176+
# contract,
177+
# tickType='BidAsk',
178+
# numberOfTicks=1,
179+
# )
180+
181+
# ITC (inter task comms)
182+
from_trio = asyncio.Queue()
183+
to_trio, from_aio = trio.open_memory_channel(float("inf"))
184+
185+
async def start_ib(from_trio, to_trio):
186+
print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!")
187+
async with get_client() as client:
188+
189+
# stream ticks to trio task
190+
def ontick(ticker: Ticker):
191+
for t in ticker.ticks:
192+
# send tick data to trio
193+
to_trio.send_nowait(t)
194+
195+
ticker = client.ib.reqMktData(spy, '588', False, False, None)
196+
ticker.updateEvent += ontick
197+
198+
n = await from_trio.get()
199+
assert n == 0
200+
201+
# sleep and let the engine run
202+
await asyncio.sleep(float('inf'))
203+
204+
# TODO: cmd processing from trio
205+
# while True:
206+
# n = await from_trio.get()
207+
# print(f"aio got: {n}")
208+
# to_trio.send_nowait(n + 1)
209+
210+
async def trio_main():
211+
print("trio_main!")
212+
213+
asyncio.create_task(
214+
start_ib(from_trio, to_trio)
215+
)
216+
217+
from_trio.put_nowait(0)
218+
219+
async for tick in from_aio:
220+
print(f"trio got: {tick}")
221+
222+
# TODO: send cmds to asyncio
223+
# from_trio.put_nowait(n + 1)
224+
225+
async def aio_main():
226+
loop = asyncio.get_running_loop()
227+
228+
trio_done_fut = asyncio.Future()
229+
230+
def trio_done_callback(main_outcome):
231+
print(f"trio_main finished: {main_outcome!r}")
232+
trio_done_fut.set_result(main_outcome)
233+
234+
trio.lowlevel.start_guest_run(
235+
trio_main,
236+
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
237+
done_callback=trio_done_callback,
238+
)
239+
240+
(await trio_done_fut).unwrap()
241+
242+
asyncio.run(aio_main())

0 commit comments

Comments
 (0)