|
| 1 | +import argparse |
| 2 | +import datetime |
| 3 | +from dateutil.relativedelta import relativedelta |
| 4 | +import decimal |
| 5 | +import json |
| 6 | +import queue |
| 7 | +import time |
| 8 | + |
| 9 | +import boto3 |
| 10 | +from botocore.exceptions import ClientError |
| 11 | +from contracts import SecurityDefinition |
| 12 | + |
| 13 | +import ibapi.wrapper |
| 14 | +from ibapi import (comm) |
| 15 | +from ibapi.client import EClient |
| 16 | +from ibapi.common import * |
| 17 | +from ibapi.contract import Contract |
| 18 | +from ibapi.errors import * |
| 19 | +from ibapi.ticktype import TickType, TickTypeEnum |
| 20 | +from ibapi.utils import * |
| 21 | +from ibapi.utils import (BadMessage) |
| 22 | + |
| 23 | + |
| 24 | +# Helper class to convert a DynamoDB item to JSON. |
| 25 | +class DecimalEncoder(json.JSONEncoder): |
| 26 | + def default(self, o): |
| 27 | + if isinstance(o, decimal.Decimal): |
| 28 | + if o % 1 > 0: |
| 29 | + return float(o) |
| 30 | + else: |
| 31 | + return int(o) |
| 32 | + return super(DecimalEncoder, self).default(o) |
| 33 | + |
| 34 | + |
| 35 | +class Utils(object): |
| 36 | + def __init__(self): |
| 37 | + pass |
| 38 | + |
| 39 | + @staticmethod |
| 40 | + def reliable(func): |
| 41 | + def _decorator(self, *args, **kwargs): |
| 42 | + tries = 0 |
| 43 | + result = func(self, *args, **kwargs) |
| 44 | + if result is None: |
| 45 | + while result is None and tries < 10: |
| 46 | + tries += 1 |
| 47 | + time.sleep(2 ** tries) |
| 48 | + result = func(self, *args, **kwargs) |
| 49 | + return result |
| 50 | + |
| 51 | + return _decorator |
| 52 | + |
| 53 | + |
| 54 | +class InterruptableClient(EClient): |
| 55 | + def __init__(self): |
| 56 | + EClient.__init__(self, self) |
| 57 | + self.lastStamp = datetime.datetime.utcnow() |
| 58 | + |
| 59 | + def runnable(self, func): |
| 60 | + """This is the function that has the message loop.""" |
| 61 | + |
| 62 | + try: |
| 63 | + while not self.done and (self.conn.isConnected() |
| 64 | + or not self.msg_queue.empty()): |
| 65 | + try: |
| 66 | + try: |
| 67 | + text = self.msg_queue.get(block=True, timeout=0.2) |
| 68 | + if len(text) > MAX_MSG_LEN: |
| 69 | + self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), |
| 70 | + "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) |
| 71 | + self.disconnect() |
| 72 | + break |
| 73 | + except queue.Empty: |
| 74 | + if datetime.datetime.utcnow() - self.lastStamp > datetime.timedelta(seconds=30): |
| 75 | + func() |
| 76 | + self.lastStamp = datetime.datetime.utcnow() |
| 77 | + logging.debug("queue.get: empty") |
| 78 | + else: |
| 79 | + fields = comm.read_fields(text) |
| 80 | + logging.debug("fields %s", fields) |
| 81 | + self.decoder.interpret(fields) |
| 82 | + except (KeyboardInterrupt, SystemExit): |
| 83 | + logging.info("detected KeyboardInterrupt, SystemExit") |
| 84 | + self.keyboardInterrupt() |
| 85 | + self.keyboardInterruptHard() |
| 86 | + except BadMessage: |
| 87 | + logging.info("BadMessage") |
| 88 | + self.conn.disconnect() |
| 89 | + |
| 90 | + logging.debug("conn:%d queue.sz:%d", |
| 91 | + self.conn.isConnected(), |
| 92 | + self.msg_queue.qsize()) |
| 93 | + finally: |
| 94 | + self.disconnect() |
| 95 | + |
| 96 | + |
| 97 | +class IbApp(InterruptableClient, ibapi.wrapper.EWrapper): |
| 98 | + def __init__(self, start, end): |
| 99 | + self.__start = start.date() |
| 100 | + self.__end = end.date() |
| 101 | + self.months = int((end.date() - start.date()).days / 30) |
| 102 | + |
| 103 | + self.Logger = logging.getLogger() |
| 104 | + self.Logger.setLevel(logging.INFO) |
| 105 | + logging.basicConfig(format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') |
| 106 | + InterruptableClient.__init__(self) |
| 107 | + self.nextValidOrderId = None |
| 108 | + self.nextValidReqId = None |
| 109 | + self.requestedHistoricalData = {} |
| 110 | + self.historicalLookup = {} |
| 111 | + self.sec = SecurityDefinition() |
| 112 | + db = boto3.resource('dynamodb', region_name='us-east-1') |
| 113 | + self.__Securities = db.Table('Securities') |
| 114 | + self.__QuotesEod = db.Table('Quotes.EOD.UAT') |
| 115 | + |
| 116 | + def __del__(self): |
| 117 | + self.disconnect() |
| 118 | + |
| 119 | + def UpdateQuote(self, symbol, date, opn, close, high, low, volume, barCount): |
| 120 | + try: |
| 121 | + details = {"Open": decimal.Decimal(str(opn)), "Close": decimal.Decimal(str(close)), |
| 122 | + "High": decimal.Decimal(str(high)), "Low": decimal.Decimal(str(low)), |
| 123 | + "Volume": volume, "Count": barCount} |
| 124 | + response = self.__QuotesEod.update_item( |
| 125 | + Key={ |
| 126 | + 'Symbol': symbol, |
| 127 | + 'Date': date, |
| 128 | + }, |
| 129 | + UpdateExpression="set #d = :d, #s = :s", |
| 130 | + ExpressionAttributeNames={ |
| 131 | + '#d': 'Details', |
| 132 | + '#s': 'Source', |
| 133 | + }, |
| 134 | + ExpressionAttributeValues={ |
| 135 | + ':d': details, |
| 136 | + ':s': 'IB', |
| 137 | + }, |
| 138 | + ReturnValues="UPDATED_NEW") |
| 139 | + |
| 140 | + except ClientError as e: |
| 141 | + self.Logger.error(e.response['Error']['Message']) |
| 142 | + except Exception as e: |
| 143 | + self.Logger.error(e) |
| 144 | + else: |
| 145 | + self.Logger.debug(json.dumps(response, indent=4, cls=DecimalEncoder)) |
| 146 | + |
| 147 | + def verify(self): |
| 148 | + self.Logger.info('requesting server time') |
| 149 | + self.reqCurrentTime() |
| 150 | + |
| 151 | + for key, value in self.requestedHistoricalData.items(): |
| 152 | + if value.lastTradeDateOrContractMonth != '': |
| 153 | + expiry = datetime.datetime.strptime(value.lastTradeDateOrContractMonth, '%Y%m%d') |
| 154 | + end = expiry.strftime('%Y%m%d %H:%M:%S') |
| 155 | + duration = "30 D" |
| 156 | + else: |
| 157 | + end = self.__end.strftime('%Y%m%d %H:%M:%S') |
| 158 | + duration = "%s M" % self.months |
| 159 | + |
| 160 | + self.reqHistoricalData(key, value, end, duration, "1 day", "TRADES", 1, 1, False, list("XYZ")) |
| 161 | + self.Logger.info('re-requesting Historical Data for ReqId: %s' % key) |
| 162 | + |
| 163 | + def loop(self): |
| 164 | + self.runnable(self.verify) |
| 165 | + |
| 166 | + def GetContract(self, date): |
| 167 | + symbol = self.sec.get_next_expiry('VX', date) |
| 168 | + exp = self.sec.get_next_expiry_date('VX', date) |
| 169 | + contract = ('VIX', 'FUT', 'CFE', 'VX', exp.strftime('%Y%m%d'), symbol) |
| 170 | + return contract |
| 171 | + |
| 172 | + def start(self): |
| 173 | + |
| 174 | + # items = [('VIX', 'FUT', 'CFE', 'VX', '20171220', 'VXZ7')] |
| 175 | + items = [('VIX', 'IND', 'CBOE', '', '', 'VIX')] |
| 176 | + # items = [('VIX', 'FUT', 'CFE', 'VX', '20180214', 'VXG8')] |
| 177 | + # items = [] |
| 178 | + nxt = self.__start |
| 179 | + while nxt <= self.__end: |
| 180 | + contract = self.GetContract(nxt) |
| 181 | + items.append(contract) |
| 182 | + nxt = nxt + relativedelta(months=1) |
| 183 | + |
| 184 | + for sym, typ, exch, tc, exp, loc in items: |
| 185 | + |
| 186 | + validated = Contract() |
| 187 | + validated.symbol = sym |
| 188 | + validated.secType = typ |
| 189 | + validated.exchange = exch |
| 190 | + validated.tradingClass = tc |
| 191 | + validated.lastTradeDateOrContractMonth = exp |
| 192 | + validated.includeExpired = True |
| 193 | + validated.localSymbol = loc |
| 194 | + |
| 195 | + hId = self.nextReqId() |
| 196 | + self.historicalLookup[hId] = validated.localSymbol |
| 197 | + self.requestedHistoricalData[hId] = validated |
| 198 | + if exp != '': |
| 199 | + expiry = datetime.datetime.strptime(exp, '%Y%m%d') |
| 200 | + end = expiry.strftime('%Y%m%d %H:%M:%S') |
| 201 | + duration = "30 D" |
| 202 | + else: |
| 203 | + end = self.__end.strftime('%Y%m%d %H:%M:%S') |
| 204 | + duration = "%s M" % self.months |
| 205 | + self.Logger.info('ReqId: %s. Requesting Historical %s %s %s %s %s %s' % (hId, sym, typ, exch, tc, exp, loc)) |
| 206 | + self.reqHistoricalData(hId, validated, end, duration, "1 day", "TRADES", 1, 1, False, list("XYZ")) |
| 207 | + |
| 208 | + def nextReqId(self): |
| 209 | + reqId = self.nextValidReqId |
| 210 | + self.nextValidReqId += 1 |
| 211 | + return reqId |
| 212 | + |
| 213 | + def nextOrderId(self): |
| 214 | + orderId = self.nextValidOrderId |
| 215 | + self.nextValidOrderId += 1 |
| 216 | + return orderId |
| 217 | + |
| 218 | + @iswrapper |
| 219 | + def historicalData(self, reqId: TickerId, bar: BarData): |
| 220 | + sym = self.historicalLookup[reqId] |
| 221 | + |
| 222 | + self.Logger.info("ReqId: " + str(reqId) + " HistoricalData. " + sym + " Date: " + bar.date + " Open: " |
| 223 | + + str(bar.open) + " High: " + str(bar.high) + " Low: " + str(bar.low) + " Close: " |
| 224 | + + str(bar.close) + " Volume: " + str(bar.volume) + " Count: " + str(bar.barCount)) |
| 225 | + if reqId in self.requestedHistoricalData: |
| 226 | + del self.requestedHistoricalData[reqId] |
| 227 | + |
| 228 | + self.UpdateQuote(sym, bar.date, bar.open, bar.close, bar.high, bar.low, bar.volume, bar.barCount) |
| 229 | + |
| 230 | + @iswrapper |
| 231 | + def historicalDataEnd(self, reqId: int, start: str, end: str): |
| 232 | + super(IbApp, self).historicalDataEnd(reqId, start, end) |
| 233 | + self.Logger.info("HistoricalDataEnd " + str(reqId) + " from " + start + " to " + end) |
| 234 | + |
| 235 | + @iswrapper |
| 236 | + def tickSnapshotEnd(self, reqId: int): |
| 237 | + super(IbApp, self).tickSnapshotEnd(reqId) |
| 238 | + self.Logger.info("TickSnapshotEnd: %s" % reqId) |
| 239 | + |
| 240 | + @iswrapper |
| 241 | + def nextValidId(self, orderId: int): |
| 242 | + super(IbApp, self).nextValidId(orderId) |
| 243 | + |
| 244 | + self.Logger.info("setting nextValidOrderId: %d" % orderId) |
| 245 | + self.nextValidOrderId = orderId |
| 246 | + self.nextValidReqId = orderId |
| 247 | + self.start() |
| 248 | + |
| 249 | + @iswrapper |
| 250 | + def marketDataType(self, reqId: TickerId, marketDataType: int): |
| 251 | + super(IbApp, self).marketDataType(reqId, marketDataType) |
| 252 | + self.Logger.info("MarketDataType. %s Type: %s" % (reqId, marketDataType)) |
| 253 | + |
| 254 | + @iswrapper |
| 255 | + def error(self, *args): |
| 256 | + super(IbApp, self).error(*args) |
| 257 | + |
| 258 | + @iswrapper |
| 259 | + def winError(self, *args): |
| 260 | + super(IbApp, self).error(*args) |
| 261 | + |
| 262 | + @iswrapper |
| 263 | + def currentTime(self, tim: int): |
| 264 | + super(IbApp, self).currentTime(tim) |
| 265 | + self.Logger.info('currentTime: %s' % tim) |
| 266 | + |
| 267 | + |
| 268 | +def main(): |
| 269 | + parser = argparse.ArgumentParser() |
| 270 | + parser.add_argument('--host', help='IB host', required=True) |
| 271 | + parser.add_argument('--port', help='IB port', type=int, required=True) |
| 272 | + parser.add_argument('--clientId', help='IB client id', type=int, required=True) |
| 273 | + parser.add_argument('--start', help='Start', type=lambda x: datetime.datetime.strptime(x, '%Y%m%d'), required=True) |
| 274 | + parser.add_argument('--end', help='End', type=lambda x: datetime.datetime.strptime(x, '%Y%m%d'), required=True) |
| 275 | + args = parser.parse_args() |
| 276 | + |
| 277 | + app = IbApp(args.start, args.end) |
| 278 | + app.connect(args.host, args.port, args.clientId) |
| 279 | + app.Logger.info("serverVersion:%s connectionTime:%s" % (app.serverVersion(), app.twsConnectionTime())) |
| 280 | + |
| 281 | + app.loop() |
| 282 | + |
| 283 | + |
| 284 | +if __name__ == "__main__": |
| 285 | + main() |
0 commit comments