Skip to content

Commit

Permalink
History and Lookup Sockets now working.
Browse files Browse the repository at this point in the history
  • Loading branch information
akapur committed Mar 5, 2016
1 parent 42c3958 commit 871f435
Showing 1 changed file with 198 additions and 19 deletions.
217 changes: 198 additions & 19 deletions conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,8 @@ class HistoryConn(FeedConn):
('prd_vlm', 'u8'),
('open_int', 'u8')])



_databuf = namedtuple("_databuf", ['failed', 'err_msg', 'num_pts', 'raw_data'])

def __init__(self, name: str = "HistoryConn", host: str = FeedConn.host, port: int = port):
Expand All @@ -1102,13 +1104,13 @@ def __init__(self, name: str = "HistoryConn", host: str = FeedConn.host, port: i

def _set_message_mappings(self) -> None:
super()._set_message_mappings()
self._pf_dict['H'] = self.process_hist_datum
self._pf_dict['H'] = self.process_datum

def send_connect_message(self):
# The history socket does not accept connect messages
pass

def process_hist_datum(self, fields: Sequence[str]) -> None:
def process_datum(self, fields: Sequence[str]) -> None:
req_id = fields[0]
if 'E' == fields[1]:
# Error
Expand All @@ -1121,7 +1123,6 @@ def process_hist_datum(self, fields: Sequence[str]) -> None:
elif '!ENDMSG!' == fields[1]:
self._req_event[req_id].set()
else:
# self._req_buf[req_id] += (datum + '\n')
self._req_buf[req_id].append(fields)
self._req_numlines[req_id] += 1

Expand Down Expand Up @@ -1240,11 +1241,11 @@ def request_ticks_for_days(self, ticker: str, num_days: int,
else:
return data

# noinspection PyPep8
def request_ticks_in_period(self, ticker: str, bgn_prd: datetime.datetime, end_prd: datetime.datetime,
bgn_flt: datetime.time=None, end_flt: datetime.time=None, ascend: bool=False,
max_ticks: int=None, timeout: int=None) -> np.array:
# HTT,[Symbol],[BeginDate BeginTime],[EndDate EndTime],[MaxDatapoints],[BeginFilterTime],[EndFilterTime],[DataDirection],[RequestID],[DatapointsPerSend]<CR><LF>
# HTT,[Symbol],[BeginDate BeginTime],[EndDate EndTime],[MaxDatapoints],[BeginFilterTime],[EndFilterTime],
# [DataDirection],[RequestID],[DatapointsPerSend]<CR><LF>
req_id = self._get_next_req_id()
self._setup_request_data(req_id)
bp_str = datetime_to_yyyymmdd_hhmmss(bgn_prd)
Expand Down Expand Up @@ -1325,12 +1326,12 @@ def request_bars_for_days(self, ticker: str, interval_len: int, interval_type: s
else:
return data

# noinspection PyPep8
def request_bars_in_period(self, ticker: str, interval_len: int, interval_type: str,
bgn_prd: datetime.datetime, end_prd: datetime.datetime,
bgn_flt: datetime.time=None, end_flt: datetime.time=None, ascend: bool=False,
max_bars: int=None, timeout: int=None) -> np.array:
# HIT,[Symbol],[Interval],[BeginDate BeginTime],[EndDate EndTime],[MaxDatapoints],[BeginFilterTime],[EndFilterTime],[DataDirection],[RequestID],[DatapointsPerSend],[IntervalType]<CR><LF>
# HIT,[Symbol],[Interval],[BeginDate BeginTime],[EndDate EndTime],[MaxDatapoints],[BeginFilterTime],
# [EndFilterTime],[DataDirection],[RequestID],[DatapointsPerSend],[IntervalType]<CR><LF>
assert interval_type in ('s', 'v', 't')
req_id = self._get_next_req_id()
self._setup_request_data(req_id)
Expand All @@ -1351,7 +1352,6 @@ def request_bars_in_period(self, ticker: str, interval_len: int, interval_type:
else:
return data

# noinspection PyUnresolvedReferences
def read_days(self, req_id: str) -> np.array:
res = self.get_data_buf(req_id)
if res.failed:
Expand Down Expand Up @@ -1647,25 +1647,190 @@ def update_naic_codes(self):


class LookupConn(FeedConn):
pass
port = 9100

asset_type = np.dtype([('symbol', 'S128'),
('market', 'u1'),
('security_type', 'u1'),
('name', 'S128'),
('sector', 'u8')])

_databuf = namedtuple("_databuf", ['failed', 'err_msg', 'num_pts', 'raw_data'])

def __init__(self, name: str = "LookupConn", host: str = FeedConn.host, port: int = port):
super().__init__(name, host, port)
self._set_message_mappings()
self._req_num = 0
self._req_buf = {}
self._req_numlines = {}
self._req_event = {}
self._req_failed = {}
self._req_err = {}
self._buf_lock = threading.RLock()

if __name__ == "__main__":
from service import FeedService
from passwords import dtn_login, dtn_password, dtn_product_id
def _set_message_mappings(self) -> None:
super()._set_message_mappings()
self._pf_dict['L'] = self.process_lookup_datum

svc = FeedService(product=dtn_product_id, version="Debugging", login=dtn_login, password=dtn_password,
autoconnect=True, savelogininfo=True)
svc.launch()
def send_connect_message(self):
# The history socket does not accept connect messages
pass

def process_lookup_datum(self, fields: Sequence[str]) -> None:
req_id = fields[0]
if 'E' == fields[1]:
# Error
self._req_failed[req_id] = True
err_msg = "Unknown Error"
if len(fields) > 2:
if fields[2] != "":
err_msg = fields[2]
self._req_err[req_id] = err_msg
elif '!ENDMSG!' == fields[1]:
self._req_event[req_id].set()
else:
self._req_buf[req_id].append(fields)
self._req_numlines[req_id] += 1

def _get_next_req_id(self) -> str:
with self._buf_lock:
req_id = "L_%.10d" % self._req_num
self._req_num += 1
return req_id

def _cleanup_request_data(self, req_id: str) -> None:
with self._buf_lock:
del self._req_failed[req_id]
del self._req_err[req_id]
del self._req_buf[req_id]
del self._req_numlines[req_id]

def _setup_request_data(self, req_id: str) -> None:
with self._buf_lock:
self._req_buf[req_id] = deque()
self._req_numlines[req_id] = 0
self._req_failed[req_id] = False
self._req_err[req_id] = ""
self._req_event[req_id] = threading.Event()

def get_data_buf(self, req_id: str) -> namedtuple:
with self._buf_lock:
buf = LookupConn._databuf(
failed=self._req_failed[req_id],
err_msg=self._req_err[req_id],
num_pts=self._req_numlines[req_id],
raw_data=self._req_buf[req_id]
)
self._cleanup_request_data(req_id)
return buf

def read_symbols(self, req_id: str) -> np.array:
res = self.get_data_buf(req_id)
if res.failed:
return np.array([res.err_msg], dtype='object')
else:
data = np.empty(res.num_pts, LookupConn.asset_type)
line_num = 0
while res.raw_data and (line_num < res.num_pts):
dl = res.raw_data.popleft()
data[line_num]['symbol'] = dl[1].strip()
data[line_num]['market'] = read_uint8(dl[2])
data[line_num]['security_type'] = read_uint8(dl[3])
data[line_num]['name'] = dl[4].strip()
data[line_num]['sector'] = 0
line_num += 1
if line_num >= res.num_pts:
assert len(res.raw_data) == 0
if len(res.raw_data) == 0:
assert line_num >= res.num_pts
return data

def request_symbols_by_filter(self, search_term: str, search_field: str='d',
filt_val: str=None, filt_type: str=None, timeout=None) -> np.array:
# SBF,[Field To Search],[Search String],[Filter Type],[Filter Value],[RequestID]<CR><LF>
assert search_field in ('d', 's')
assert search_term is not None
assert filt_type is None or filt_type in ('e', 't')

req_id = self._get_next_req_id()
self._setup_request_data(req_id)
req_cmd = "SBF,%s,%s,%s,%s,%s\r\n" % (search_field, search_term,
blob_to_str(filt_type), blob_to_str(filt_val), req_id)
self.send_cmd(req_cmd)
self._req_event[req_id].wait(timeout=timeout)
data = self.read_symbols(req_id)
if data.dtype == object:
err_msg = "Request: %s, Error: %s" % (req_cmd, str(data[0]))
raise RuntimeError(err_msg)
else:
return data

def read_symbols_with_sect(self, req_id: str) -> np.array:
res = self.get_data_buf(req_id)
if res.failed:
return np.array([res.err_msg], dtype='object')
else:
data = np.empty(res.num_pts, LookupConn.asset_type)
line_num = 0
while res.raw_data and (line_num < res.num_pts):
dl = res.raw_data.popleft()
data[line_num]['sector'] = read_uint64(dl[1])
data[line_num]['symbol'] = dl[2].strip()
data[line_num]['market'] = read_uint8(dl[3])
data[line_num]['security_type'] = read_uint8(dl[4])
data[line_num]['name'] = dl[5].strip()
line_num += 1
if line_num >= res.num_pts:
assert len(res.raw_data) == 0
if len(res.raw_data) == 0:
assert line_num >= res.num_pts
return data

def request_symbols_by_sic(self, sic: int, timeout=None) -> np.array:
# SBS,[Search String],[RequestID]<CR><LF>
req_id = self._get_next_req_id()
self._setup_request_data(req_id)
req_cmd = "SBS,%d,%s\r\n" % (sic, req_id)
self.send_cmd(req_cmd)
self._req_event[req_id].wait(timeout=timeout)
data = self.read_symbols_with_sect(req_id)
if data.dtype == object:
err_msg = "Request: %s, Error: %s" % (req_cmd, str(data[0]))
raise RuntimeError(err_msg)
else:
return data

def request_symbols_by_naic(self, naic: int, timeout=None) -> np.array:
# SBN,[Search String],[RequestID]<CR><LF>
req_id = self._get_next_req_id()
self._setup_request_data(req_id)
req_cmd = "SBS,%d,%s\r\n" % (naic, req_id)
self.send_cmd(req_cmd)
self._req_event[req_id].wait(timeout=timeout)
data = self.read_symbols_with_sect(req_id)
if data.dtype == object:
err_msg = "Request: %s, Error: %s" % (req_cmd, str(data[0]))
raise RuntimeError(err_msg)
else:
return data


# if __name__ == "__main__":
# from service import FeedService
# from passwords import dtn_login, dtn_password, dtn_product_id
#
# svc = FeedService(product=dtn_product_id, version="Debugging", login=dtn_login, password=dtn_password,
# autoconnect=True, savelogininfo=True)
# svc.launch()
#
# admin_conn = AdminConn(name="RunningInIde")
# admin_conn.start_runner()
# admin_conn.set_admin_variables_from_dict(svc.admin_variables())
# admin_conn.client_stats_on()
# time.sleep(30)
# admin_conn.client_stats_off()
# admin_conn.stop_runner()

#
# quote_conn = QuoteConn(name="RunningInIDE")
# quote_conn.start_runner()
#
Expand All @@ -1680,7 +1845,7 @@ class LookupConn(FeedConn):
# print("Unwatched")
# time.sleep(3)
# quote_conn.stop_runner()

#
# hist_conn = HistoryConn(name="RunningInIde")
# hist_conn.start_runner()
#
Expand Down Expand Up @@ -1732,6 +1897,20 @@ class LookupConn(FeedConn):
# print(table_conn.get_trade_conditions())
# print(table_conn.get_sic_codes())
# print(table_conn.get_naic_codes())


#
# lookup_conn = LookupConn(name="RunningInIDE")
# lookup_conn.start_runner()
#
# tesla_syms = lookup_conn.request_symbols_by_filter(search_term='INTC',
# search_field='s')
# print(tesla_syms)
#
# sic_symbols = lookup_conn.request_symbols_by_sic(83)
# print(sic_symbols)
#
# naic_symbols = lookup_conn.request_symbols_by_naic(10)
# print(naic_symbols)
#
# lookup_conn.stop_runner()
#

0 comments on commit 871f435

Please sign in to comment.