11from typing import Any , Dict , List , Union
22from io import BufferedIOBase , TextIOWrapper
33from tempfile import _TemporaryFileWrapper
4+ import datetime as dt
45import logging
6+ import json
57import time
68
79from pydantic import validate_arguments
810
911from cs_tools .data .enums import Privilege
1012from cs_tools .errors import InsufficientPrivileges , TSLoadServiceUnreachable
1113from cs_tools .const import (
12- FMT_TSLOAD_DATETIME , FMT_TSLOAD_DATE , FMT_TSLOAD_TIME , FMT_TSLOAD_TRUE_FALSE
14+ FMT_TSLOAD_DATETIME , FMT_TSLOAD_DATE , FMT_TSLOAD_TIME , FMT_TSLOAD_TRUE_FALSE ,
15+ APP_DIR
1316)
1417
1518
@@ -25,22 +28,96 @@ class TSLoadMiddleware:
2528 """
2629 def __init__ (self , ts ):
2730 self .ts = ts
31+ # The load server resides on a different port compared to standard ThoughtSpot
32+ # services. This is because the service tends to carry heavy file-load
33+ # operations, and having a separate web server creates the needed isolation
34+ # between standard ThoughtSpot services and tsload operations. By default, this
35+ # service runs on all nodes of a ThoughtSpot cluster. This provides load
36+ # distribution to address possible simultaneous loads. The tsload server uses
37+ # its own load balancer. If an external load balancer is used, the tsload
38+ # requests must be sticky, and the tsload load balancer should be disabled.
39+ #
40+ # To turn off the load balancer, issue the following tscli commands
41+ # tscli --adv service add-gflag etl_http_server.etl_http_server etl_server_enable_load_balancer false
42+ # tscli --adv service add-gflag etl_http_server.etl_http_server etl_server_always_expose_node_ip true
43+ #
44+ # DEV NOTE
45+ # On each public method in this middleware, a keyword argument called
46+ # `ignore_node_redirect` which will remove the redirection logic from
47+ # further calls to the tsload service api. Since this is handled on a
48+ # client-by-client basis with no input from the API itself, we expose it as
49+ # a kwarg.
50+ #
51+ # Further reading:
52+ # https://docs.thoughtspot.com/latest/admin/loading/load-with-tsload.html
53+ #
54+ self ._cache_fp = APP_DIR / '.cache/tsload-node-redirect-by-cycle-id.json'
55+
56+ def _cache_node_redirect (self , cycle_id : str , * , node_info : Dict = None ) -> Dict [str , Dict ]:
57+ """
58+ Method is a total hack.
59+ """
60+ try :
61+ with self ._cache_fp .open (mode = 'r' ) as j :
62+ cache = json .load (j )
63+ except FileNotFoundError :
64+ cache = {}
65+
66+ # nothing to write, or we should be reading
67+ if node_info is None :
68+ return cache
69+
70+ # write to cache
71+ now = dt .datetime .utcnow ().timestamp ()
72+ cache [cycle_id ] = {** node_info , 'load_datetime' : now }
73+
74+ # keep only recent data
75+ cache = {
76+ cycle : details
77+ for cycle , details in cache .items ()
78+ if (now - details ['load_datetime' ]) <= (10 * 86400 ) # 10 days
79+ }
80+
81+ with self ._cache_fp .open (mode = 'w' ) as j :
82+ json .dump (cache , j , indent = 4 , sort_keys = True )
83+
84+ return cache
85+
86+ def _check_for_redirect_auth (self , cycle_id : str ) -> None :
87+ """
88+ Attempt a login.
89+
90+ By default, the tsload service API sits behind a load balancer. When we first
91+ init a new load cycle, the balancer will respond with the proper node (if
92+ applicable) to submit file uploads to. If that node is not the main node, then
93+ we will be required to authorize again.
94+ """
95+ cache = self ._cache_node_redirect (cycle_id )
96+
97+ if cycle_id in cache :
98+ ds = self .ts .api .ts_dataservice
99+ ds ._tsload_node = cache [cycle_id ]['host' ]
100+ ds ._tsload_port = cache [cycle_id ]['port' ]
101+ log .debug (f'redirecting to: { ds .etl_server_fullpath } ' )
102+ ds .load_auth ()
28103
29104 def _check_privileges (self ) -> None :
30105 """
106+ Determine if the user has necessary Data Manager privileges.
31107 """
32108 if not set (self .ts .me .privileges ).intersection (REQUIRED_PRIVILEGES ):
33109 raise InsufficientPrivileges (
34110 user = self .ts .me ,
35111 service = 'remote TQL' ,
36- required_privileges = REQUIRED_PRIVILEGES
112+ required_privileges = ', ' . join ( REQUIRED_PRIVILEGES )
37113 )
38114
39115 @validate_arguments (config = dict (arbitrary_types_allowed = True ))
40116 def upload (
41117 self ,
42118 fd : Union [BufferedIOBase , TextIOWrapper , _TemporaryFileWrapper ],
43119 * ,
120+ ignore_node_redirect : bool = False ,
44121 database : str ,
45122 table : str ,
46123 schema_ : str = 'falcon_default_schema' ,
@@ -85,6 +162,11 @@ def upload(
85162 fp : pathlib.Path
86163 file to load to thoughtspot
87164
165+ ignore_node_redirect : bool [default: False]
166+ whether or not to ignore node redirection
167+
168+ **tsload_options
169+
88170 Returns
89171 -------
90172 cycle_id
@@ -150,17 +232,43 @@ def upload(
150232 http_error = e
151233 )
152234
153- cycle_id = r .json ()['cycle_id' ]
154- self .ts .api .ts_dataservice .load_start (cycle_id , fd = fd )
155- self .ts .api .ts_dataservice .load_commit (cycle_id )
156- return cycle_id
235+ data = r .json ()
236+ self ._cache_node_redirect (data ['cycle_id' ], node_info = data .get ('node_address' , None ))
237+
238+ if not ignore_node_redirect :
239+ self ._check_for_redirect_auth (data ['cycle_id' ])
240+
241+ self .ts .api .ts_dataservice .load_start (data ['cycle_id' ], fd = fd )
242+ self .ts .api .ts_dataservice .load_commit (data ['cycle_id' ])
243+ return data ['cycle_id' ]
157244
158245 @validate_arguments
159- def status (self , cycle_id : str , * , wait_for_complete : bool = False ):
246+ def status (
247+ self ,
248+ cycle_id : str ,
249+ * ,
250+ ignore_node_redirect : bool = False ,
251+ wait_for_complete : bool = False
252+ ) -> Dict [str , Any ]:
160253 """
254+ Get the status of a previously started data load.
255+
256+ Parameters
257+ ----------
258+ cycle_id : str
259+ data load to check on
260+
261+ ignore_node_redirect : bool [default: False]
262+ whether or not to ignore node redirection
263+
264+ wait_for_complete: bool [default: False]
265+ poll the load server until it responds with OK or ERROR
161266 """
162267 self ._check_privileges ()
163268
269+ if not ignore_node_redirect :
270+ self ._check_for_redirect_auth (cycle_id = cycle_id )
271+
164272 while True :
165273 r = self .ts .api .ts_dataservice .load_status (cycle_id )
166274 data = r .json ()
0 commit comments