Skip to content

Commit 321bcf6

Browse files
author
boonhapus
committed
🐛 tql.status should be implemented, hey ?
1 parent e001afd commit 321bcf6

File tree

2 files changed

+55
-42
lines changed

2 files changed

+55
-42
lines changed

cs_tools/api/middlewares/tsload.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import Any, Dict, List, Union
2-
from io import BufferedIOBase
2+
from io import BufferedIOBase, TextIOWrapper
33
from tempfile import _TemporaryFileWrapper
44
import logging
5+
import time
56

67
from pydantic import validate_arguments
78

@@ -38,7 +39,7 @@ def _check_privileges(self) -> None:
3839
@validate_arguments(config=dict(arbitrary_types_allowed=True))
3940
def upload(
4041
self,
41-
fd: Union[BufferedIOBase, _TemporaryFileWrapper],
42+
fd: Union[BufferedIOBase, TextIOWrapper, _TemporaryFileWrapper],
4243
*,
4344
database: str,
4445
table: str,
@@ -154,21 +155,29 @@ def upload(
154155
self.ts.api.ts_dataservice.load_commit(cycle_id)
155156
return cycle_id
156157

157-
# @validate_arguments
158-
# def status(self, cycle_id: str, *, wait_for_complete: bool = False):
159-
# """
160-
# """
161-
# self._check_privileges()
158+
@validate_arguments
159+
def status(self, cycle_id: str, *, wait_for_complete: bool = False):
160+
"""
161+
"""
162+
self._check_privileges()
163+
164+
while True:
165+
r = self.ts.api.ts_dataservice.load_status(cycle_id)
166+
data = r.json()
167+
168+
if not wait_for_complete:
169+
break
162170

163-
# while True:
164-
# r = self.ts.api.ts_dataservice.load_status(cycle_id)
165-
# data = r.json()
171+
if data['internal_stage'] == 'COMMITTING':
172+
pass
173+
elif data['internal_stage'] == 'DONE':
174+
break
175+
elif data['status']['message'] != 'OK':
176+
break
166177

167-
# if not wait_for_complete:
168-
# break
178+
time.sleep(1)
169179

170-
# print(data)
171-
# raise
180+
return data
172181

173182
# @validate_arguments
174183
# def bad_records(self, cycle_id: str) -> List[Dict[str, Any]]:

cs_tools/cli/tools/rtsload/app.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@
3737
def status(
3838
ctx: typer.Context,
3939
cycle_id: str=A_(..., help='data load cycle id'),
40-
bad_records: str = O_(
41-
None,
42-
'--bad_records_file',
43-
help='file to use for storing rows that failed to load',
44-
metavar='protocol://DEFINITION.toml',
45-
callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
46-
)
40+
# bad_records: str = O_(
41+
# None,
42+
# '--bad_records_file',
43+
# help='file to use for storing rows that failed to load',
44+
# metavar='protocol://DEFINITION.toml',
45+
# callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
46+
# )
4747
):
4848
"""
4949
Get the status of a data load.
@@ -59,18 +59,21 @@ def status(
5959
f'\nIgnored rows: {data["ignored_row_count"]}'
6060
)
6161

62-
if data['ignored_row_count'] > 0:
63-
now = dt.datetime.now().strftime('%Y-%m-%dT%H_%M_%S')
64-
fp = f'BAD_RECORDS_{now}_{cycle_id}'
65-
console.print(
66-
f'[red]\n\nBad records found...\n\twriting to {bad_records.directory / fp}'
67-
)
68-
data = ts.tsload.bad_records(cycle_id)
69-
bad_records.dump(fp, data=data)
62+
# if int(data['ignored_row_count']) > 0:
63+
# now = dt.datetime.now().strftime('%Y-%m-%dT%H_%M_%S')
64+
# fp = f'BAD_RECORDS_{now}_{cycle_id}'
65+
# console.print(
66+
# f'[red]\n\nBad records found...\n\twriting to {bad_records.directory / fp}'
67+
# )
68+
# data = ts.tsload.bad_records(cycle_id)
69+
# bad_records.dump(fp, data=data)
7070

7171
if data['status']['code'] == 'LOAD_FAILED':
7272
console.print(f'\nFailure reason:\n [red]{data["status"]["message"]}[/]')
7373

74+
if data.get('parsing_errors', False):
75+
console.print(f'[red]{data["parsing_errors"]}')
76+
7477

7578
@app.command(cls=CSToolsCommand)
7679
@depends(
@@ -96,13 +99,13 @@ def file(
9699
has_header_row: bool = O_(False, '--has_header_row', show_default=False, help='indicates that the input file contains a header row'),
97100
escape_character: str = O_('"', '--escape_character', help='specifies the escape character used in the input file'),
98101
enclosing_character: str = O_('"', '--enclosing_character', help='enclosing character in csv source format'),
99-
bad_records: str = O_(
100-
None,
101-
'--bad_records_file',
102-
help='file to use for storing rows that failed to load',
103-
metavar='protocol://DEFINITION.toml',
104-
callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
105-
),
102+
# bad_records: str = O_(
103+
# None,
104+
# '--bad_records_file',
105+
# help='file to use for storing rows that failed to load',
106+
# metavar='protocol://DEFINITION.toml',
107+
# callback=lambda ctx, to: SyncerProtocolType().convert(to, ctx=ctx)
108+
# ),
106109
flexible: bool = O_(False, '--flexible', show_default=False, help='whether input data file exactly matches target schema', hidden=True)
107110
):
108111
"""
@@ -120,9 +123,9 @@ def file(
120123
#
121124

122125
opts = {
123-
'target_database': target_database,
124-
'target_table': target_table,
125-
'target_schema': target_schema,
126+
'database': target_database,
127+
'table': target_table,
128+
'schema_': target_schema,
126129
'empty_target': empty_target,
127130
'max_ignored_rows': max_ignored_rows,
128131
'date_format': date_format,
@@ -138,10 +141,11 @@ def file(
138141
'enclosing_character': enclosing_character
139142
}
140143

141-
with console.status(f'[bold green]Loading {file} to ThoughtSpot..'):
142-
cycle_id = ts.tsload.upload(ts, fp=file, **opts, verbose=True)
144+
with console.status(f'[bold green]Loading [yellow]{file}[/] to ThoughtSpot..'):
145+
with file.open('r', encoding='utf-8', newline='') as fd:
146+
cycle_id = ts.tsload.upload(fd, **opts)
143147

144-
console.log(f'Data load cycle_id: {cycle_id}')
148+
console.log(f'Data load cycle_id: [cyan]{cycle_id}')
145149

146150
# if bad_records is None:
147151
# return

0 commit comments

Comments
 (0)