Skip to content

Commit 6545a8c

Browse files
Merge pull request #41 from fund3/dev/builtin_session_refresh
Dev/builtin session refresh
2 parents 9a5fc5c + f6918a8 commit 6545a8c

File tree

6 files changed

+149
-23
lines changed

6 files changed

+149
-23
lines changed

omega_client/communication/omega_connection.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -478,13 +478,14 @@ def configure_default_omega_connection(omega_endpoint: str,
478478
response_receiver = ResponseReceiver(ZMQ_CONTEXT,
479479
RESPONSE_RECEIVER_ENDPOINT,
480480
response_handler)
481-
omega_connection = OmegaConnection(ZMQ_CONTEXT,
482-
omega_endpoint,
483-
REQUEST_SENDER_ENDPOINT,
484-
RESPONSE_RECEIVER_ENDPOINT,
485-
request_sender,
486-
response_receiver,
487-
server_zmq_encryption_key=omega_server_key)
481+
omega_connection = OmegaConnection(
482+
ZMQ_CONTEXT,
483+
omega_endpoint,
484+
REQUEST_SENDER_ENDPOINT,
485+
RESPONSE_RECEIVER_ENDPOINT,
486+
request_sender,
487+
response_receiver,
488+
server_zmq_encryption_key=omega_server_key)
488489
return omega_connection, request_sender, response_receiver
489490

490491

@@ -513,14 +514,16 @@ def configure_single_client_omega_connection(omega_endpoint: str,
513514
REQUEST_SENDER_ENDPOINT,
514515
client_id,
515516
sender_comp_id)
517+
response_handler.set_request_sender(request_sender=request_sender)
516518
response_receiver = ResponseReceiver(ZMQ_CONTEXT,
517519
RESPONSE_RECEIVER_ENDPOINT,
518520
response_handler)
519-
omega_connection = OmegaConnection(ZMQ_CONTEXT,
520-
omega_endpoint,
521-
REQUEST_SENDER_ENDPOINT,
522-
RESPONSE_RECEIVER_ENDPOINT,
523-
request_sender,
524-
response_receiver,
525-
server_zmq_encryption_key=omega_server_key)
521+
omega_connection = OmegaConnection(
522+
ZMQ_CONTEXT,
523+
omega_endpoint,
524+
REQUEST_SENDER_ENDPOINT,
525+
RESPONSE_RECEIVER_ENDPOINT,
526+
request_sender,
527+
response_receiver,
528+
server_zmq_encryption_key=omega_server_key)
526529
return omega_connection, request_sender, response_receiver

omega_client/communication/request_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from omega_client.fpg.fpg_lib import create_SOR_order, FPGAuth
1111
from omega_client.messaging.common_types import AccountBalancesReport, \
12-
AccountCredentials, AccountInfo, AuthorizationGrant, AuthorizationRefresh, \
12+
AccountCredentials, AccountInfo, AuthorizationRefresh, \
1313
CompletedOrdersReport, ExchangePropertiesReport, \
1414
ExecutionReport, OpenPositionsReport, Order, OrderInfo, \
1515
OrderType, RequestHeader, TimeInForce, WorkingOrdersReport, Batch, OPO, OCO

omega_client/communication/single_client_request_sender.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
import zmq
55

6-
from omega_client.messaging.common_types import AccountCredentials, AccountInfo, \
7-
Order, OrderInfo, OrderType, RequestHeader, TimeInForce, Batch, OPO, OCO
6+
from omega_client.messaging.common_types import AccountCredentials, \
7+
AccountInfo, AuthorizationRefresh, Batch, OCO, OPO, \
8+
Order, OrderInfo, OrderType, RequestHeader, TimeInForce
89
from omega_client.communication.request_sender import RequestSender
910

1011

@@ -160,3 +161,9 @@ def request_completed_orders(self, account_info: AccountInfo,
160161
def request_exchange_properties(self, exchange: str):
161162
return self._request_sender.request_exchange_properties(
162163
request_header=self._request_header, exchange=exchange)
164+
165+
def request_authorization_refresh(self,
166+
auth_refresh: AuthorizationRefresh):
167+
return self._request_sender.request_authorization_refresh(
168+
request_header=self._request_header, auth_refresh=auth_refresh
169+
)

omega_client/messaging/response_handler.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def on_logon_ack(self,
104104
:param logon_ack: (LogonAck) LogonAck message from Omega.
105105
:param client_id: (int) client_id of the response.
106106
:param sender_comp_id: (str) sender_comp_id of the response.
107-
:param request_id: (int) request_id which requested this response
107+
:param request_id: (int) request_id which requested this response.
108108
"""
109109

110110
@abstractmethod
@@ -208,8 +208,8 @@ def on_completed_orders_report(self,
208208
@abstractmethod
209209
def on_exchange_properties_report(self,
210210
report: ExchangePropertiesReport,
211-
client_id,
212-
sender_comp_id,
211+
client_id: int,
212+
sender_comp_id: str,
213213
request_id: int):
214214
"""
215215
Override in subclass to handle Omega ExchangePropertiesReport response.
@@ -222,8 +222,8 @@ def on_exchange_properties_report(self,
222222
@abstractmethod
223223
def on_authorization_grant(self,
224224
authorization_grant: AuthorizationGrant,
225-
client_id,
226-
sender_comp_id,
225+
client_id: int,
226+
sender_comp_id: str,
227227
request_id: int):
228228
"""
229229
Override in subclass to handle Omega AuthorizationGrant response.
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
from datetime import datetime as dt
2+
import logging
3+
from threading import Timer
4+
5+
from omega_client.messaging.common_types import AccountBalancesReport, \
6+
AccountCredentials, AccountDataReport, AuthorizationGrant, \
7+
AuthorizationRefresh, CompletedOrdersReport, ExchangePropertiesReport, \
8+
ExecutionReport, LogoffAck, LogonAck, OpenPositionsReport, SystemMessage,\
9+
WorkingOrdersReport
10+
from omega_client.messaging.response_handler import ResponseHandler
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class SingleClientResponseHandler(ResponseHandler):
16+
###########################################################################
17+
# #
18+
# ~~~~~~~~~~~~~~~~~~~~~~~~~ Incoming OmegaMessages ~~~~~~~~~~~~~~~~~~~~~~ #
19+
# #
20+
###########################################################################
21+
def __init__(self, refresh_buffer_time: float = 30.):
22+
self._command_dispatcher = {
23+
'heartbeat': self.on_heartbeat,
24+
'test': self.on_test_message,
25+
'serverTime': self.on_server_time,
26+
'system': self.on_system_message,
27+
'logonAck': self._on_logon_ack,
28+
'logoffAck': self.on_logoff_ack,
29+
'executionReport': self.on_exec_report,
30+
'accountDataReport': self.on_account_data,
31+
'workingOrdersReport': self.on_working_orders_report,
32+
'accountBalancesReport': self.on_account_balances,
33+
'openPositionsReport': self.on_open_positions,
34+
'completedOrdersReport': self.on_completed_orders_report,
35+
'exchangePropertiesReport': self.on_exchange_properties_report,
36+
'authorizationGrant': self._on_authorization_grant
37+
}
38+
self._request_sender = None
39+
self._refresh_token = None
40+
self._REFRESH_BUFFER_TIME = refresh_buffer_time
41+
42+
def set_request_sender(self, request_sender):
43+
self._request_sender = request_sender
44+
45+
def _on_logon_ack(self,
46+
logon_ack: LogonAck,
47+
client_id: int,
48+
sender_comp_id: str,
49+
request_id: int):
50+
"""
51+
Internal
52+
:param logon_ack: (LogonAck) LogonAck message from Omega.
53+
:param client_id: (int) client_id of the response.
54+
:param sender_comp_id: (str) sender_comp_id of the response.
55+
:param request_id: (int) request_id which requested this response
56+
:return:
57+
"""
58+
if (logon_ack and logon_ack.success and logon_ack.authorization_grant
59+
and logon_ack.authorization_grant.success
60+
):
61+
self._request_sender.set_access_token(
62+
logon_ack.authorization_grant.access_token)
63+
self._refresh_token = logon_ack.authorization_grant.refresh_token
64+
self._send_authorization_refresh()
65+
else:
66+
if not logon_ack.success:
67+
logger.error('logon_ack error: ',
68+
extra={'message': logon_ack.message})
69+
if not logon_ack.authorization_grant.success:
70+
logger.error(
71+
'authorization_grant error: ',
72+
extra={'message': logon_ack.authorization_grant.message})
73+
self.on_logon_ack(
74+
logon_ack=logon_ack,
75+
client_id=client_id,
76+
sender_comp_id=sender_comp_id,
77+
request_id=request_id)
78+
79+
def _on_authorization_grant(self,
80+
authorization_grant: AuthorizationGrant,
81+
client_id: int,
82+
sender_comp_id: str,
83+
request_id: int):
84+
"""
85+
Override in subclass to handle Omega AuthorizationGrant response.
86+
:param authorization_grant: AuthorizationGrant python object
87+
:param client_id: (int) client_id of the response.
88+
:param sender_comp_id: (str) sender_comp_id of the response.
89+
:param request_id: (int) request_id which requested this response
90+
"""
91+
if authorization_grant and authorization_grant.success:
92+
self._request_sender.set_access_token(
93+
authorization_grant.access_token)
94+
self._refresh_token = authorization_grant.refresh_token
95+
self._token_expire_time = authorization_grant.expire_at
96+
time_until_session_refresh = (
97+
self._token_expire_time - dt.utcnow().timestamp() -
98+
self._REFRESH_BUFFER_TIME)
99+
Timer(time_until_session_refresh,
100+
self._send_authorization_refresh).start()
101+
else:
102+
if not authorization_grant.success:
103+
logger.error(
104+
'authorization_grant error: ',
105+
extra={'message': authorization_grant.message})
106+
self.on_authorization_grant(
107+
authorization_grant=authorization_grant,
108+
client_id=client_id,
109+
sender_comp_id=sender_comp_id,
110+
request_id=request_id)
111+
112+
def _send_authorization_refresh(self):
113+
self._request_sender.request_authorization_refresh(
114+
auth_refresh=AuthorizationRefresh(
115+
refresh_token=self._refresh_token)
116+
)

omega_protocol/omega_protocol

0 commit comments

Comments
 (0)