Skip to content

Commit

Permalink
limit to 5 messages per second, debugging ws_gateway latency issues, …
Browse files Browse the repository at this point in the history
…collecting browser_finger_print and stunt_ip on the verification form
  • Loading branch information
pinhopro committed Apr 9, 2015
1 parent 0f87197 commit 4853028
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 40 deletions.
56 changes: 18 additions & 38 deletions apps/ws_gateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, application, request, **kwargs):

self.trade_client = None
self.user_response = None
self.last_order_datetime = datetime.now()
self.last_message_datetime = [datetime.now()]
self.open_orders = {}
self.md_subscriptions = {}
self.sec_status_subscriptions = {}
Expand Down Expand Up @@ -145,6 +145,23 @@ def on_message(self, raw_message):
if self.trade_client is None or not self.trade_client.isConnected():
return

self.last_message_datetime.append(datetime.now())
message_time_last_second = self.last_message_datetime[-1] - timedelta(seconds=1)
for x in xrange(0, len(self.last_message_datetime)):
if self.last_message_datetime[x] > message_time_last_second:
self.last_message_datetime = self.last_message_datetime[x:]
break
if len(self.last_message_datetime) > 5: # higher than 5 messages per second
self.application.log("ERROR",
"TOO_MANY_MESSAGES",
"Exceed 5 messages per second. [ip=" + self.remote_ip + ",'" + raw_message + "']")
self.write_message(
'{"MsgType":"ERROR", "Description":"Too many messages per second", "Detail": "6 messages in the last second"}')
self.application.unregister_connection(self)
self.trade_client.close()
self.close()
return

try:
req_msg = JsonMessage(raw_message)
except InvalidMessageException as e:
Expand Down Expand Up @@ -203,41 +220,6 @@ def on_message(self, raw_message):
self.on_security_status_request(req_msg)
return

if req_msg.isNewOrderSingle():
if self.last_order_datetime:
order_time = datetime.now()
order_time_less_one_second = order_time - timedelta(milliseconds=200) # max of 5 orders per second
if self.last_order_datetime > order_time_less_one_second:
#self.application.log('ORDER_REJECT', self.trade_client.connection_id, raw_message )
reject_message = {
"MsgType": "8",
"OrderID": None,
"ExecID": None,
"ExecType": "8",
"OrdStatus": "8",
"CumQty": 0,
"Symbol": req_msg.get("Symbol"),
"OrderQty": req_msg.get("Qty"),
"LastShares": 0,
"LastPx": 0,
"Price": req_msg.get("Price"),
"TimeInForce": "1",
"LeavesQty": 0,
"ExecSide": req_msg.get("Side"),
"Side": req_msg.get("Side"),
"OrdType": req_msg.get("OrdType"),
"CxlQty": req_msg.get("Qty"),
"ClOrdID": req_msg.get("ClOrdID"),
"AvgPx": 0,
"OrdRejReason": "Exceeded the maximum number of orders sent per second.",
"Volume": 0
}
#self.write_message(str(json.dumps(reject_message, cls=JsonEncoder)))
#return

self.last_order_datetime = datetime.now()


if req_msg.isDepositRequest():
if not req_msg.get('DepositMethodID') and not req_msg.get('DepositID'):

Expand All @@ -261,7 +243,6 @@ def on_message(self, raw_message):
if secret[0] in ('0','1','2','3','4','5','6','7','8','9'):
dest_wallet = cold_wallet


if not dest_wallet:
return

Expand All @@ -272,7 +253,6 @@ def on_message(self, raw_message):
'currency': currency
})


try:
url_payment_processor = self.application.options.url_payment_processor + '?' + parameters
self.application.log('DEBUG', self.trade_client.connection_id, "invoking..." + url_payment_processor )
Expand Down
10 changes: 9 additions & 1 deletion apps/ws_gateway/market_data_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from sqlalchemy.orm import scoped_session, sessionmaker

from datetime import datetime
import datetime

MDSUBSCRIBEDICT = {}

Expand Down Expand Up @@ -100,6 +100,8 @@ def get_trades(self, symbol, since):

def on_md_publish(self, publish_msg):
"""" on_md_publish. """
start = datetime.datetime.now()

topic = publish_msg[0]
raw_message = publish_msg[1]

Expand All @@ -111,6 +113,12 @@ def on_md_publish(self, publish_msg):
elif msg.type == 'X': # Incremental
self.on_md_incremental(msg)

finish = datetime.datetime.now()
self.application.log("DEBUG", "PERF", str([ (finish-start).total_seconds(),
"MarketDataSubscriber.on_md_publish",
"1",
[topic, raw_message] ] ) )

def on_md_full_refresh(self, msg):
"""" on_md_full_refresh. """
# TODO: Check if our current order book is sync with the full refresh
Expand Down
14 changes: 13 additions & 1 deletion apps/ws_gateway/verification_webhook_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def post(self, *args, **kwargs):
address_country = None
address_country_code = None
finger_print = None
stunt_ip = None

photo_fields = []
id_fields = []
Expand Down Expand Up @@ -100,6 +101,9 @@ def post(self, *args, **kwargs):
if 'finger_print' in key:
finger_print = value

if 'stunt_ip' in key:
stunt_ip = value

#form stack
if 'name-first' in key:
first_name = value
Expand Down Expand Up @@ -176,7 +180,6 @@ def post(self, *args, **kwargs):
'formID': formID,
'submissionID': submissionID,
'created_at': createdAt,
'browser_finger_print': finger_print,
'name': {
'first': first_name,
'middle': middle_name,
Expand All @@ -198,6 +201,15 @@ def post(self, *args, **kwargs):
'Verify': 1
}

if finger_print:
verify_request_message['VerificationData']['browser_finger_print'] = finger_print

try:
if stunt_ip:
verify_request_message['VerificationData']['stunt_ip'] = json.loads(stunt_ip)
except:
pass

for field in id_fields:
for key, value in raw_request.iteritems():
if field in key:
Expand Down

0 comments on commit 4853028

Please sign in to comment.