Skip to content

Commit 09d9767

Browse files
author
boonhapus
committed
Merge branch 'v1.3.0-hot-fixes' into dev
2 parents a6d3821 + 321bcf6 commit 09d9767

File tree

4 files changed

+80
-46
lines changed

4 files changed

+80
-46
lines changed

cs_tools/api/middlewares/tsload.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import Any, Dict, List, Union
2-
from io import BufferedIOBase
2+
from io import BufferedIOBase, TextIOWrapper
33
from tempfile import _TemporaryFileWrapper
44
import logging
5+
import time
56

67
from pydantic import validate_arguments
78

@@ -38,7 +39,7 @@ def _check_privileges(self) -> None:
3839
@validate_arguments(config=dict(arbitrary_types_allowed=True))
3940
def upload(
4041
self,
41-
fd: Union[BufferedIOBase, _TemporaryFileWrapper],
42+
fd: Union[BufferedIOBase, TextIOWrapper, _TemporaryFileWrapper],
4243
*,
4344
database: str,
4445
table: str,
@@ -154,21 +155,29 @@ def upload(
154155
self.ts.api.ts_dataservice.load_commit(cycle_id)
155156
return cycle_id
156157

157-
# @validate_arguments
158-
# def status(self, cycle_id: str, *, wait_for_complete: bool = False):
159-
# """
160-
# """
161-
# self._check_privileges()
158+
@validate_arguments
159+
def status(self, cycle_id: str, *, wait_for_complete: bool = False):
160+
"""
161+
"""
162+
self._check_privileges()
163+
164+
while True:
165+
r = self.ts.api.ts_dataservice.load_status(cycle_id)
166+
data = r.json()
167+
168+
if not wait_for_complete:
169+
break
162170

163-
# while True:
164-
# r = self.ts.api.ts_dataservice.load_status(cycle_id)
165-
# data = r.json()
171+
if data['internal_stage'] == 'COMMITTING':
172+
pass
173+
elif data['internal_stage'] == 'DONE':
174+
break
175+
elif data['status']['message'] != 'OK':
176+
break
166177

167-
# if not wait_for_complete:
168-
# break
178+
time.sleep(1)
169179

170-
# print(data)
171-
# raise
180+
return data
172181

173182
# @validate_arguments
174183
# def bad_records(self, cycle_id: str) -> List[Dict[str, Any]]:

cs_tools/cli/tools/rtsload/app.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@
3737
def status(
3838
ctx: typer.Context,
3939
cycle_id: str=A_(..., help='data load cycle id'),
40-
bad_records: str = O_(
41-
None,
42-
'--bad_records_file',
43-
help='file to use for storing rows that failed to load',
44-
metavar='protocol://DEFINITION.toml',
45-
callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
46-
)
40+
# bad_records: str = O_(
41+
# None,
42+
# '--bad_records_file',
43+
# help='file to use for storing rows that failed to load',
44+
# metavar='protocol://DEFINITION.toml',
45+
# callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
46+
# )
4747
):
4848
"""
4949
Get the status of a data load.
@@ -59,18 +59,21 @@ def status(
5959
f'\nIgnored rows: {data["ignored_row_count"]}'
6060
)
6161

62-
if data['ignored_row_count'] > 0:
63-
now = dt.datetime.now().strftime('%Y-%m-%dT%H_%M_%S')
64-
fp = f'BAD_RECORDS_{now}_{cycle_id}'
65-
console.print(
66-
f'[red]\n\nBad records found...\n\twriting to {bad_records.directory / fp}'
67-
)
68-
data = ts.tsload.bad_records(cycle_id)
69-
bad_records.dump(fp, data=data)
62+
# if int(data['ignored_row_count']) > 0:
63+
# now = dt.datetime.now().strftime('%Y-%m-%dT%H_%M_%S')
64+
# fp = f'BAD_RECORDS_{now}_{cycle_id}'
65+
# console.print(
66+
# f'[red]\n\nBad records found...\n\twriting to {bad_records.directory / fp}'
67+
# )
68+
# data = ts.tsload.bad_records(cycle_id)
69+
# bad_records.dump(fp, data=data)
7070

7171
if data['status']['code'] == 'LOAD_FAILED':
7272
console.print(f'\nFailure reason:\n [red]{data["status"]["message"]}[/]')
7373

74+
if data.get('parsing_errors', False):
75+
console.print(f'[red]{data["parsing_errors"]}')
76+
7477

7578
@app.command(cls=CSToolsCommand)
7679
@depends(
@@ -96,13 +99,13 @@ def file(
9699
has_header_row: bool = O_(False, '--has_header_row', show_default=False, help='indicates that the input file contains a header row'),
97100
escape_character: str = O_('"', '--escape_character', help='specifies the escape character used in the input file'),
98101
enclosing_character: str = O_('"', '--enclosing_character', help='enclosing character in csv source format'),
99-
bad_records: str = O_(
100-
None,
101-
'--bad_records_file',
102-
help='file to use for storing rows that failed to load',
103-
metavar='protocol://DEFINITION.toml',
104-
callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
105-
),
102+
# bad_records: str = O_(
103+
# None,
104+
# '--bad_records_file',
105+
# help='file to use for storing rows that failed to load',
106+
# metavar='protocol://DEFINITION.toml',
107+
# callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
108+
# ),
106109
flexible: bool = O_(False, '--flexible', show_default=False, help='whether input data file exactly matches target schema', hidden=True)
107110
):
108111
"""
@@ -120,9 +123,9 @@ def file(
120123
#
121124

122125
opts = {
123-
'target_database': target_database,
124-
'target_table': target_table,
125-
'target_schema': target_schema,
126+
'database': target_database,
127+
'table': target_table,
128+
'schema_': target_schema,
126129
'empty_target': empty_target,
127130
'max_ignored_rows': max_ignored_rows,
128131
'date_format': date_format,
@@ -138,10 +141,11 @@ def file(
138141
'enclosing_character': enclosing_character
139142
}
140143

141-
with console.status(f'[bold green]Loading {file} to ThoughtSpot..'):
142-
cycle_id = ts.tsload.upload(ts, fp=file, **opts, verbose=True)
144+
with console.status(f'[bold green]Loading [yellow]{file}[/] to ThoughtSpot..'):
145+
with file.open('r', encoding='utf-8', newline='') as fd:
146+
cycle_id = ts.tsload.upload(fd, **opts)
143147

144-
console.log(f'Data load cycle_id: {cycle_id}')
148+
console.log(f'Data load cycle_id: [cyan]{cycle_id}')
145149

146150
# if bad_records is None:
147151
# return

cs_tools/sync/falcon/syncer.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ class Falcon:
2424
database: str = 'cs_tools'
2525
schema_: str = Field('falcon_default_schema', alias='schema')
2626
empty_target: bool = True
27+
timeout: float = 60.0
2728

2829
# DATABASE ATTRIBUTES
2930
__is_database__ = True
3031
metadata = None
3132

3233
def __post_init_post_parse__(self):
34+
self.timeout = self.timeout or None
3335
ctx = click.get_current_context()
3436
self.engine = sa.engine.create_mock_engine('sqlite://', self.intercept_create_table)
3537
self.cnxn = self.engine.connect()
@@ -51,7 +53,11 @@ def intercept_create_table(self, sql, *multiparams, **params):
5153
if 'ts_bi_server' in q:
5254
return
5355

54-
self.ts.tql.command(command=f'{q};', database=self.database)
56+
self.ts.tql.command(
57+
command=f'{q};',
58+
database=self.database,
59+
http_timeout=self.timeout
60+
)
5561

5662
def ensure_setup(self, metadata, cnxn, **kw):
5763

@@ -69,8 +75,14 @@ def ensure_setup(self, metadata, cnxn, **kw):
6975
)
7076

7177
# create the database and schema if it doesn't exist
72-
self.ts.tql.command(command=f'CREATE DATABASE {self.database};')
73-
self.ts.tql.command(command=f'CREATE SCHEMA {self.database}.{self.schema_};')
78+
self.ts.tql.command(
79+
command=f'CREATE DATABASE {self.database};', http_timeout=self.timeout
80+
)
81+
82+
self.ts.tql.command(
83+
command=f'CREATE SCHEMA {self.database}.{self.schema_};',
84+
http_timeout=self.timeout
85+
)
7486

7587
def capture_metadata(self, metadata, cnxn, **kw):
7688
self.metadata = metadata
@@ -88,7 +100,11 @@ def load(self, table: str) -> List[Dict[str, Any]]:
88100
t = self.metadata.tables[table]
89101
q = t.select().compile(dialect=self.engine.dialect)
90102
q = str(q).strip()
91-
r = self.ts.tql.query(statement=f'{q};', database=self.database)
103+
r = self.ts.tql.query(
104+
statement=f'{q};',
105+
database=self.database,
106+
http_timeout=self.timeout
107+
)
92108
d = next(_['data'] for _ in r if 'data' in _) # there will be only 1 response
93109
return d
94110

docs/syncer/falcon.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ Falcon is __ThoughtSpot__'s proprietary in-memory database that exists as part o
3636
<br/>*<span class=fc-mint>default</span>:* `true`
3737
<br/>*a* `TRUNCATE` *statement will be issued prior to loading any data loads if* `true` *is used*
3838
39+
> __timeout__{ .fc-blue }: <span class=fc-coral>optional</span>, number of seconds to wait for individual commands to Falcon
40+
<br/>*<span class=fc-mint>default</span>:* `60.0`
41+
<br/>*if the timeout value is set to* `0` *then calls over to Falcon will never reach a timeout threshold*
42+
3943

4044
??? question "How do I use the Falcon syncer in commands?"
4145

@@ -62,6 +66,7 @@ Falcon is __ThoughtSpot__'s proprietary in-memory database that exists as part o
6266
database = 'cs_tools'
6367
schema = 'falcon_default_schema'
6468
empty_target = true
69+
timeout = 5.0
6570
```
6671

6772
[gh-issue25]: https://github.com/thoughtspot/cs_tools/issues/25

0 commit comments

Comments
 (0)