Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 67 additions & 26 deletions fixcli
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ of the session.

"""

from future import standard_library
standard_library.install_aliases()
from builtins import bytes
from builtins import str
from builtins import range
from builtins import object
import os;
import sys;
import string;
Expand All @@ -42,15 +48,25 @@ import struct;
import argparse;
import getpass;
import asyncore, socket;
import ConfigParser;
import configparser;
import signal;
from collections import OrderedDict;
import threading;

# ./fixcli:44: DeprecationWarning: The asyncore module is deprecated
# and will be removed in Python 3.12. The recommended replacement is
# asyncio.
#
# For Python 3.12+ install https://pypi.org/project/pyasyncore/
import warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
import asyncore

logging.basicConfig(format='%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s');
logger = logging.getLogger(__file__);
logger.setLevel(logging.CRITICAL);
config = ConfigParser.RawConfigParser();
config = configparser.RawConfigParser();
cfgfile = os.path.join('/'.join(os.path.abspath(__file__).split('/')[:-1]), '.' + os.path.basename(__file__).replace('.py', '') + 'rc');


Expand All @@ -59,7 +75,7 @@ class FIXMessage(object):
FIXMessage holds a single FIX (Financial Information Exchange) message.
"""

def __init__(self, msgtype=None, seqnum=None, sender=None, target=None, testreqid=None, raw=None):
def __init__(self, msgtype=None, seqnum=None, sender=None, target=None, sender_sub_id=None, testreqid=None, raw=None):
"""
The constructor takes in either FIX message tags/values or raw FIX message. If the
input is message tags/values, it creates a raw message. If the input is a raw
Expand All @@ -69,15 +85,16 @@ class FIXMessage(object):
:param seqnum: message sequence number, MsgSeqNum <34>
:param sender: the identity of a firm sending a message, SenderCompID <49>
:param target: the identity of a firm receiving a message, TargetCompID <56>
:param payload: raw FIX message
:param sender_sub_id: the sub-identity of a firm sending a message, SenderSubID <50>. Only used for Logon.
:param raw: raw FIX message
:return: either None or integer value
"""
self._load_msg_schema();
self.error = False;
if msgtype:
self.outgoing = True;
self.body = b'';
if msgtype not in self.schema['MsgType'].keys():
if msgtype not in list(self.schema['MsgType'].keys()):
logger.error('message type \'' + msgtype + '\' is invalid');
self.error = True;
if not seqnum:
Expand All @@ -99,18 +116,21 @@ class FIXMessage(object):
self.seqnum = seqnum;
self.sender = sender;
self.target = target;
self.sender_sub_id = sender_sub_id;
self.fields = OrderedDict({
'MsgType': self.schema['MsgType'][self.msgtype],
'MsgSeqNum': str(self.seqnum),
'SenderCompID': str(self.sender),
'TargetCompID': str(self.target),
'SendingTime': self.timestamp,
'SendingTime': self.timestamp,
});
if self.msgtype == 'Logon':
self.encryption_method = 0;
self.heartbeat = 30;
self.fields['EncryptMethod'] = str(self.encryption_method);
self.fields['HeartBtInt'] = str(self.heartbeat);
if self.sender_sub_id:
self.fields['SenderSubID'] = self.sender_sub_id;
elif self.msgtype == 'Resend Request':
self.fields['BeginSeqNo'] = '1';
self.fields['EndSeqNo'] = '0';
Expand All @@ -122,11 +142,12 @@ class FIXMessage(object):
pass;
self.version = 'FIX.4.2';
for f in self.fields:
self.body += bytes(str(self.tag_ids[f]) + '=' + self.fields[f]) + b'\x01';
self.header = bytes(str(self.tag_ids['BeginString']) + '=' + self.version) + b'\x01';
self.header += bytes(str(self.tag_ids['BodyLength']) + '=' + str(len(self.body))) + b'\x01';
self.checksum = sum([ord(i) for i in list(self.header + self.body)]) % 256;
self.raw = self.header + self.body + bytes(str(self.tag_ids['CheckSum']) + '=' + str(self.checksum).zfill(3)) + b'\x01';
self.body += bytes(str(self.tag_ids[f]) + '=' + self.fields[f], encoding='utf8') + b'\x01';
self.header = bytes(str(self.tag_ids['BeginString']) + '=' + self.version, encoding='utf8') + b'\x01';
self.header += bytes(str(self.tag_ids['BodyLength']) + '=' + str(len(self.body)), encoding='utf8') + b'\x01';
self.checksum = sum(list(self.header + self.body)) % 256;
self.raw = self.header + self.body + bytes(str(self.tag_ids['CheckSum']) + '=' + str(self.checksum).zfill(3), encoding='utf8') + b'\x01';

else:
self.raw = raw;
self.fields = OrderedDict({});
Expand All @@ -146,11 +167,16 @@ class FIXMessage(object):
value_s = value + ' (' + self.schema[self.tag_ids[tag]][value] + ')';
self.fields[tag_s] = value;
if tag_s == 'MsgType':
self.msgtype = self.schema['MsgType'][value];
try:
self.msgtype = self.schema['MsgType'][value];
except Exception as e:
logger.error('unsupported MsgType: ' + value);
elif tag_s == 'SenderCompID':
self.sender = value;
elif tag_s == 'TargetCompID':
self.target = value;
elif tag_s == 'SenderSubID':
self.sender_sub_id = value;
elif tag_s == 'MsgSeqNum':
self.seqnum = int(value);
elif tag_s == 'BeginString':
Expand All @@ -169,7 +195,7 @@ class FIXMessage(object):
self.pretty += str(' * {0: <30} : {1: <5} : {2}').format(tag_s, tag, value_s) + '\n';
prettylen = len(max(self.pretty.split('\n'), key=len)) + 1;
self.pretty = '\n' + str('-' * prettylen) + '\n' + self.pretty + str('-' * prettylen) + '\n';

return;


Expand Down Expand Up @@ -206,6 +232,7 @@ class FIXMessage(object):
'OrigSendingTime': '122',
'SenderCompID': '49',
'TargetCompID': '56',
'SenderSubID': '50',
'SecurityExchange': '207',
'MsgSeqNum': '34',
'SendingTime': '52',
Expand All @@ -227,9 +254,13 @@ class FIXMessage(object):
'Heartbeat':'0',
'Test Request':'1',
'Resend Request':'2',
'Reject':'3',
'Sequence Reset':'4',
'Logout':'5',
'ExecutionReport':'8',
'Order-Single':'D',
'Order Cancel Request':'F',
'Market Data-Incremental Refresh': 'X',
},
};

Expand All @@ -251,7 +282,7 @@ class FIXSession(asyncore.dispatcher):
asyncore.dispatcher.__init__(self);
signal.signal(signal.SIGINT, self._exit);
self.seqnum = None;
for key, value in kwargs.iteritems():
for key, value in kwargs.items():
if key == 'host':
self.host = value;
elif key == 'port':
Expand All @@ -262,6 +293,8 @@ class FIXSession(asyncore.dispatcher):
self.sender = value;
elif key == 'target':
self.target = value;
elif key == 'sender_sub_id':
self.sender_sub_id = value;
elif key == 'venue':
self.venue = value;
elif key == 'username':
Expand All @@ -273,7 +306,7 @@ class FIXSession(asyncore.dispatcher):
else:
logger.error('unsupported function argument: ' + str(key) + '=' + str(value));

self.sid = self.sender + '-' + self.target;
self.sid = self.sender + '-' + self.target
self.connected = False;
self.authenticated = False;
self._reading = False;
Expand All @@ -293,8 +326,8 @@ class FIXSession(asyncore.dispatcher):
self.seqnum = 1;
self.create_socket(socket.AF_INET, socket.SOCK_STREAM);
self.connect((self.host, self.port));
self.read_buffer = b'';
fixmsg = FIXMessage(msgtype='Logon', seqnum=self.seqnum, sender=self.sender, target=self.target);
self.read_buffer = '';
fixmsg = FIXMessage(msgtype='Logon', seqnum=self.seqnum, sender=self.sender, target=self.target, sender_sub_id=self.sender_sub_id);
self.seqnum +=1;
self.write_buffer = fixmsg.raw;
return;
Expand All @@ -313,8 +346,9 @@ class FIXSession(asyncore.dispatcher):
data = self.recv(65535);
if data:
self._reading = True;
data = data.decode(encoding='utf8')
logger.info('Incoming message: ' + ''.join(data).replace('\x01', '^'));
logger.info('-' * 60);
logger.info('-' * 60);
self.read_buffer += data;
rcvd = self._decode_fix_msg(self.read_buffer);
self.read_buffer = self.read_buffer[rcvd:];
Expand All @@ -329,8 +363,9 @@ class FIXSession(asyncore.dispatcher):
return;
if self.writable():
self._writing = True;
logger.info('Outbound message: ' + str(self.write_buffer).replace('\x01', '^'));
self._decode_fix_msg(self.write_buffer);
write_buffer_str = self.write_buffer.decode(encoding='utf-8')
logger.info('Outbound message: ' + write_buffer_str.replace('\x01', '^'));
self._decode_fix_msg(write_buffer_str);
sent = self.send(self.write_buffer);
self.write_buffer = self.write_buffer[sent:];
self._writing = False;
Expand All @@ -354,11 +389,12 @@ class FIXSession(asyncore.dispatcher):
logger.info('Connection closed by ' + self.host);
else:
logger.info('Connection closed.');

if not config.has_section(self.sid):
config.add_section(self.sid);
config.set(self.sid, 'date', self._get_date());
config.set(self.sid, 'msgseqnum', str(self.seqnum));
with open(cfgfile, 'wb') as f:
with open(cfgfile, 'w') as f:
config.write(f);
pass;

Expand All @@ -385,10 +421,11 @@ class FIXSession(asyncore.dispatcher):
else:
last = 0;
bytes_parsed += 5;
msg_buffer = msg_buffer[(pos+5):];
msg_buffer = msg_buffer[(pos+5):];
i += 1;
if pos < 0:
break;

if len(parts) > 0:
if re.search('\x0110=\d{3}(\x01|\x00)?$', msg_buffer):
bytes_parsed += len(msg_buffer);
Expand All @@ -400,7 +437,7 @@ class FIXSession(asyncore.dispatcher):
logger.error('failed to find FIX checksum');
bytes_parsed -= 5;
else:
logger.error('failed to find FIX header');
logger.error('failed to find FIX header');
except:
logger.error(str(sys.exc_info()));
pass;
Expand Down Expand Up @@ -445,7 +482,7 @@ class FIXSession(asyncore.dispatcher):
rc = args[0];
for a in args:
pass;
for k, v in kwargs.iteritems():
for k, v in kwargs.items():
pass;
if self.connected:
timeout_timer = threading.Timer(10.0, self._trigger_timeout);
Expand All @@ -465,7 +502,7 @@ class FIXSession(asyncore.dispatcher):
config.set(self.sid, 'date', self._get_date());
config.set(self.sid, 'msgseqnum', str(self.seqnum));
logger.info('Saving FIX session information ...');
with open(cfgfile, 'wb') as f:
with open(cfgfile, 'w') as f:
config.write(f);
logger.info('OK');
else:
Expand All @@ -491,6 +528,8 @@ def main():
type=str, help='Sending firm');
main_group.add_argument('-t', '--target', dest='target', metavar='TARGET_COMP_ID', required=True, \
type=str, help='Receiving firm');
main_group.add_argument('--sender-sub-id', dest='sender_sub_id', metavar='SENDER_SUBID', required=False, \
type=str, help='Sending firm sub-id');
main_group.add_argument('-e', '--exchange', dest='venue', metavar='EXCHANGE', required=False, \
type=str, help='Trading Venue/Exchange');
main_group.add_argument('-n', '--seqnum', dest='seqnum', metavar='SEQNUM', required=False, \
Expand All @@ -501,7 +540,7 @@ def main():
action='append', choices=['logout', 'keepalive', 'dormant', 'gapfill'], \
help='Mode of operation (default: dormant)');
parser.add_argument('-l', '--log-level', dest='ilog', metavar='LEVEL', type=int, default=0, \
choices=range(1, 3), help='Log level (default: 0, max: 2)');
choices=list(range(1, 3)), help='Log level (default: 0, max: 2)');
args = parser.parse_args();

if args.ilog == 1:
Expand Down Expand Up @@ -534,6 +573,8 @@ def main():
kwargs['seqnum'] = args.seqnum;
if args.venue:
kwargs['venue'] = args.venue;
if args.sender_sub_id:
kwargs['sender_sub_id'] = args.sender_sub_id;

if args.username:
kwargs['username'] = args.username;
Expand Down