Skip to content

Commit e3e4c54

Browse files
author
boonhapus
committed
Merge branch 'v1.6.2' into dev
2 parents 38bac3d + f25f97e commit e3e4c54

File tree

12 files changed

+169
-40
lines changed

12 files changed

+169
-40
lines changed

cs_tools/__project__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "1.6.1"
1+
__version__ = "1.6.2"
22
__docs__ = "https://thoughtspot.github.io/cs_tools/"
33
__repo__ = "https://github.com/thoughtspot/cs_tools"
44
__help__ = f"{__repo__}/discussions/"

cs_tools/_types.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# ==========
1313
# Meta types
1414
# ==========
15-
ExitCode: _compat.TypeAlias = Literal[0, 1]
15+
ExitSuccess: _compat.TypeAlias = Literal[0]
16+
ExitFailure: _compat.TypeAlias = Literal[1]
17+
ExitWarning: _compat.TypeAlias = Literal[2]
18+
ExitCode: _compat.TypeAlias = Literal[ExitSuccess, ExitFailure, ExitWarning]
1619
PathLike: _compat.TypeAlias = Union[str, os.PathLike, pathlib.Path]
1720

1821
# ==========

cs_tools/api/client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,24 @@ def metadata_tml_import(
337337
options["import_policy"] = policy
338338
return self.post("api/rest/2.0/metadata/tml/import", headers=options.pop("headers", None), json=options)
339339

340+
@pydantic.validate_call(validate_return=True, config=validators.METHOD_CONFIG)
341+
@_transport.CachePolicy.mark_cacheable
342+
def metadata_tml_async_import(
343+
self, tmls: list[str], policy: _types.TMLImportPolicy, **options: Any
344+
) -> Awaitable[httpx.Response]:
345+
"""Schedules a task to import TML files into ThoughtSpot."""
346+
options["metadata_tmls"] = tmls
347+
options["import_policy"] = policy
348+
return self.post("api/rest/2.0/metadata/tml/async/import", headers=options.pop("headers", None), json=options)
349+
350+
@pydantic.validate_call(validate_return=True, config=validators.METHOD_CONFIG)
351+
def metadata_tml_async_status(
352+
self, include_import_response: bool = True, **options: Any
353+
) -> Awaitable[httpx.Response]:
354+
"""Schedules a task to import TML files into ThoughtSpot."""
355+
options["include_import_response"] = include_import_response
356+
return self.post("api/rest/2.0/metadata/tml/async/status", headers=options.pop("headers", None), json=options)
357+
340358
# ==================================================================================
341359
# CONNECTIONS :: https://developers.thoughtspot.com/docs/rest-apiv2-reference#_connections
342360
# ==================================================================================
@@ -443,6 +461,11 @@ def tags_create(self, name: _types.Name, **options: Any) -> Awaitable[httpx.Resp
443461
options["name"] = name
444462
return self.post("api/rest/2.0/tags/create", json=options)
445463

464+
@pydantic.validate_call(validate_return=True, config=validators.METHOD_CONFIG)
465+
def tags_delete(self, tag_identifier: _types.ObjectIdentifier, **options: Any) -> Awaitable[httpx.Response]:
466+
"""Creates a tag object."""
467+
return self.post(f"api/rest/2.0/tags/{tag_identifier}/delete", json=options)
468+
446469
@pydantic.validate_call(validate_return=True, config=validators.METHOD_CONFIG)
447470
def tags_assign(
448471
self, guid: _types.ObjectIdentifier, tag: _types.ObjectIdentifier, **options: Any

cs_tools/api/workflows/metadata.py

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -369,25 +369,68 @@ async def tml_import(
369369
tmls: list[TMLObject],
370370
*,
371371
policy: _types.TMLImportPolicy = "ALL_OR_NONE",
372+
use_async_endpoint: bool = False,
373+
wait_for_completion: bool = False,
374+
log_errors: bool = False,
372375
http: RESTAPIClient,
373376
**tml_import_options,
374377
) -> _types.APIResult:
375378
"""Import a metadata object, alerting about warnings and errors."""
376-
r = await http.metadata_tml_import(tmls=[t.dumps() for t in tmls], policy=policy, **tml_import_options)
377-
r.raise_for_status()
379+
if use_async_endpoint:
380+
_LOG.debug(f"Async import initiated on {len(tmls):,} objects (behave synchronously: {wait_for_completion}).")
381+
r = await http.metadata_tml_async_import(tmls=[t.dumps() for t in tmls], policy=policy, **tml_import_options)
382+
r.raise_for_status()
383+
d = r.json()
384+
385+
_LOG.debug(f"RAW DATA\n{json.dumps(d, indent=2, default=str)}\n")
386+
387+
# IF WE'RE NOT WAITING FOR THE JOB TO COMPLETE, RETURN THE ASYNC JOB INFO DIRECTLY.
388+
if not wait_for_completion:
389+
return d
378390

379-
for tml_import_info, tml in zip(r.json(), tmls):
380-
tml_type = tml.tml_type_name.upper()
391+
async_job_id = d["task_id"]
381392

382-
if tml_import_info["response"]["status"]["status_code"] == "ERROR":
383-
errors = tml_import_info["response"]["status"]["error_message"].replace("<br/>", "\n")
384-
_LOG.error(f"{tml_type} '{tml.name}' failed to import, ThoughtSpot errors:\n[fg-error]{errors}")
393+
# AFTER FIVE 5-second ITERATIONS (25s), WE'LL ELEVATE THE LOGGING LEVEL.
394+
n_iterations = 0
385395

386-
if tml_import_info["response"]["status"]["status_code"] == "WARNING":
387-
errors = tml_import_info["response"]["status"]["error_message"].replace("<br/>", "\n")
388-
_LOG.warning(f"{tml_type} '{tml.name}' partially imported, ThoughtSpot errors:\n[fg-warn]{errors}")
396+
# OTHERWISE, PROCESS THE JOB AS IF IT WERE A SYNCHRONOUS PAYLOAD.
397+
while d.get("task_status") != "COMPLETED":
398+
log_level = logging.DEBUG if n_iterations < 5 else logging.INFO
399+
n_iterations += 1
400+
_LOG.log(log_level, f"Checking status of asynchronous import {async_job_id}")
401+
_ = await asyncio.sleep(5) # type: ignore[func-returns-value]
402+
r = await http.metadata_tml_async_status(task_ids=[async_job_id])
403+
r.raise_for_status()
404+
405+
# RAW DATA
406+
_ = r.json()
407+
_LOG.debug(f"RAW DATA\n{json.dumps(_, indent=2, default=str)}\n")
389408

390-
if tml_import_info["response"]["status"]["status_code"] == "OK":
391-
_LOG.debug(f"{tml_type} '{tml.name}' successfully imported")
409+
# FIRST STATUS (we only 1 in job), BUT ONLY REASSIGN while LOOP VAR IF THE KEY EXISTS.
410+
d = next(iter(_["status_list"]), d)
411+
_LOG.log(log_level, f"TASK ID: {async_job_id}\n{json.dumps(d, indent=2, default=str)}\n")
392412

393-
return r.json()
413+
# POST-PROCESSING TO MIMIC THE SYNCHRONOUS RESPONSE.
414+
d = d["import_response"]["object"]
415+
416+
else:
417+
r = await http.metadata_tml_import(tmls=[t.dumps() for t in tmls], policy=policy, **tml_import_options)
418+
r.raise_for_status()
419+
d = r.json()
420+
421+
if log_errors:
422+
for tml_import_info, tml in zip(d, tmls):
423+
tml_type = tml.tml_type_name.upper()
424+
425+
if tml_import_info["response"]["status"]["status_code"] == "ERROR":
426+
errors = tml_import_info["response"]["status"]["error_message"].replace("<br/>", "\n")
427+
_LOG.error(f"{tml_type} '{tml.name}' failed to import, ThoughtSpot errors:\n[fg-error]{errors}")
428+
429+
if tml_import_info["response"]["status"]["status_code"] == "WARNING":
430+
errors = tml_import_info["response"]["status"]["error_message"].replace("<br/>", "\n")
431+
_LOG.warning(f"{tml_type} '{tml.name}' partially imported, ThoughtSpot errors:\n[fg-warn]{errors}")
432+
433+
if tml_import_info["response"]["status"]["status_code"] == "OK":
434+
_LOG.debug(f"{tml_type} '{tml.name}' successfully imported")
435+
436+
return d

cs_tools/cli/custom_types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
"""
2+
Custom Types turn unwieldy CLI input into structured python types.
3+
"""
4+
15
from __future__ import annotations
26

37
from collections.abc import Iterator, Sequence

cs_tools/cli/tools/bulk-deleter/app.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ def from_tag(
256256
]
257257

258258
with px.WorkTracker("Deleting objects", tasks=TOOL_TASKS) as tracker:
259-
guids_to_delete: set[_types.GUID] = {tag["metadata_id"]}
259+
guids_to_delete: set[_types.GUID] = {}
260260

261261
with tracker["PREPARE"] as this_task:
262262
if not tag_only:
@@ -270,6 +270,9 @@ def from_tag(
270270
if directory is None:
271271
this_task.skip()
272272

273+
elif tag_only:
274+
this_task.skip()
275+
273276
else:
274277
this_task.total = len(guids_to_delete)
275278

@@ -292,7 +295,7 @@ async def _download_and_advance(guid: _types.GUID) -> None:
292295

293296
tracker.extra_renderable = lambda: Align.center(
294297
console.Group(
295-
Align.center(f"{len(guids_to_delete):,} objects will be deleted"),
298+
Align.center(f"{1 if tag_only else len(guids_to_delete):,} objects will be deleted"),
296299
"\n[fg-warn]Press [fg-success]Y[/] to proceed, or [fg-error]n[/] to cancel.",
297300
)
298301
)
@@ -318,6 +321,12 @@ async def _download_and_advance(guid: _types.GUID) -> None:
318321
this_task.total = len(guids_to_delete)
319322
delete_attempts = collections.defaultdict(int)
320323

324+
if tag_only:
325+
c = ts.api.tags_delete(tag_identifier=tag["metadata_id"])
326+
_ = utils.run_sync(c)
327+
this_task.advance(step=1)
328+
return 0
329+
321330
async def _delete_and_advance(guid: _types.GUID) -> None:
322331
delete_attempts[guid] += 1
323332
r = await ts.api.metadata_delete(guid=guid)

cs_tools/cli/tools/scriptability/app.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ def checkpoint(
200200
mode="EXPORT",
201201
environment=environment,
202202
status=table.job_status,
203+
info={
204+
"files_expected": len(coros),
205+
"files_exported": sum(s.status != "ERROR" for s in table.statuses),
206+
},
203207
)
204208

205209
# RECORD THE GUID MAPPING
@@ -325,7 +329,7 @@ def deploy(
325329
return 1
326330
except Exception:
327331
_LOG.debug("Error Info:", exc_info=True)
328-
_LOG.error("One of your .mappings/<env>-guid-mappings.json is in an invalid state, see logs for details..")
332+
_LOG.error("One of your .mappings/<env>-guid-mappings.json may be in an invalid state, see logs for details..")
329333
return 1
330334

331335
tmls: dict[_types.GUID, _types.TMLObject] = {}
@@ -347,18 +351,18 @@ def deploy(
347351
tmls[guid] = mapping_info.disambiguate(tml=tml, delete_unmapped_guids=True)
348352

349353
if not tmls:
350-
_LOG.info(f"No TML files found to deploy from directory (Deploy Type: {deploy_type}, Last Seen: {last_import_dt})")
354+
_LOG.info(
355+
f"No TML files found to deploy from directory (Deploy Type: {deploy_type}, Last Seen: {last_import_dt})"
356+
)
351357
return 0
352358

353-
# Silence the cs_tools metadata workflow logger since we've asked the User if they want logged feedback.
354-
logging.getLogger("cs_tools.api.workflows.metadata").setLevel(logging.CRITICAL)
355-
356359
try:
357360
c = workflows.metadata.tml_import(
358361
tmls=list(tmls.values()),
359-
use_async_endpoint=use_async_endpoint,
360-
skip_diff_check=skip_diff_check,
361362
policy=deploy_policy,
363+
use_async_endpoint=use_async_endpoint,
364+
wait_for_completion=use_async_endpoint,
365+
log_errors=False,
362366
http=ts.api,
363367
)
364368
_ = utils.run_sync(c)
@@ -381,6 +385,12 @@ def deploy(
381385
mode="VALIDATE" if deploy_policy == "VALIDATE_ONLY" else "IMPORT",
382386
environment=target_environment,
383387
status=table.job_status,
388+
info={
389+
"deploy_type": deploy_type,
390+
"deploy_policy": deploy_policy,
391+
"files_expected": len(tmls),
392+
"files_deployed": 0 if not table.can_map_guids else sum(s.status != "ERROR" for s in table.statuses),
393+
},
384394
)
385395

386396
# INJECT ERRORS WITH MORE INFO FOR OUR USERS CLARITY.
@@ -421,4 +431,10 @@ def deploy(
421431
_LOG.error("One or more TMLs failed to fully deploy, check the logs or use --log-errors for more details.")
422432
return 1
423433

434+
if table.job_status == "WARNING":
435+
_LOG.warning(
436+
"TMLs imported succesfully with one or more warnings. Check the logs or use --log-errors for more details."
437+
)
438+
return 2
439+
424440
return 0

cs_tools/cli/tools/scriptability/utils.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class MappingCheckpoint(pydantic.BaseModel):
6767
status: _types.TMLStatusCode
6868
"""The status of the checkpoint.. OK, WARNING, or ERROR."""
6969

70+
info: dict[str, Any] = pydantic.Field(default={})
71+
"""Arbitrary information about what happened."""
72+
7073
@pydantic.field_serializer("at")
7174
@classmethod
7275
def serialize_datetime(self, value: Optional[dt.datetime]) -> Optional[str]:
@@ -169,7 +172,13 @@ def merge(cls, *, source: pathlib.Path, target: pathlib.Path) -> GUIDMappingInfo
169172
return target_env
170173

171174
def checkpoint(
172-
self, *, by: str, mode: Literal["EXPORT", "VALIDATE", "IMPORT"], environment: str, status: _types.TMLStatusCode
175+
self,
176+
*,
177+
by: str,
178+
mode: Literal["EXPORT", "VALIDATE", "IMPORT"],
179+
environment: str,
180+
status: _types.TMLStatusCode,
181+
info: Optional[dict[str, Any]] = None,
173182
) -> None:
174183
"""Checkpoint the GUID mapping info."""
175184
if mode != "EXPORT" and not any(checkpoint.mode in ("EXPORT", "VALIDATE") for checkpoint in self.history):
@@ -182,6 +191,7 @@ def checkpoint(
182191
mode=mode,
183192
environment=environment,
184193
status=status,
194+
info=info,
185195
)
186196
)
187197

@@ -332,16 +342,21 @@ def statuses(self) -> list[TMLStatus]:
332342
@property
333343
def can_map_guids(self) -> bool:
334344
"""Determine if the statuses' GUIDs should be mapped."""
345+
# GUIDs are returned, but we shouldn't map them since nothing actually imported.
335346
if self.operation == "VALIDATE":
336347
return False
337348

349+
# GUIDs should not be returned if any object failed during an ALL_OR_NONE import.
338350
if self.policy == "ALL_OR_NONE" and self.job_status != "OK":
339351
return False
340352

341-
if self.policy == "PARTIAL" and any(_.status != "ERROR" for _ in self.statuses):
342-
return True
353+
# All objects failed to IMPORT.
354+
if all(_.status == "ERROR" for _ in self.statuses):
355+
return False
343356

344-
return self.job_status != "ERROR"
357+
# In this case, at least GUID has returned, EVEN IF the whole job was marked as a failure.
358+
# We may have up to 1 failure or 1 warning causing the job to be marked this way.
359+
return True
345360

346361
@property
347362
def job_status(self) -> _types.TMLStatusCode:

cs_tools/cli/tools/searchable/app.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,9 @@ def metadata(
409409

410410
# FETCH ALL ORG IDs WE'LL NEED TO COLLECT FROM
411411
if ts.session_context.thoughtspot.is_orgs_enabled:
412-
c = workflows.metadata.fetch_one(identifier=org_override, metadata_type="ORG", http=ts.api)
413-
_ = utils.run_sync(c)
414-
orgs = [_]
412+
c = ts.api.orgs_search()
413+
r = utils.run_sync(c)
414+
orgs = [_ for _ in r.json() if org_override is None or _["name"].casefold() == org_override.casefold()]
415415
else:
416416
orgs = [{"id": 0, "name": "ThoughtSpot"}]
417417

cs_tools/sync/databricks/syncer.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class Databricks(DatabaseSyncer):
3333
schema_: Optional[str] = pydantic.Field(default="default", alias="schema")
3434
port: Optional[int] = 443
3535
temp_dir: Optional[pydantic.DirectoryPath] = pathlib.Path(".")
36+
use_legacy_dataload: bool = False
3637

3738
@pydantic.field_validator("access_token", mode="before")
3839
@classmethod
@@ -174,16 +175,28 @@ def dump(self, tablename: str, *, data: _types.TableRowsFormat) -> None:
174175
return
175176

176177
table = self.metadata.tables[f"{self.schema_}.{tablename}"]
177-
stage = self.stage_and_put(tablename=tablename, data=data)
178+
179+
if not self.use_legacy_dataload:
180+
stage = self.stage_and_put(tablename=tablename, data=data)
178181

179182
if self.load_strategy == "APPEND":
180-
self.copy_into(from_=stage, into=tablename)
183+
if self.use_legacy_dataload:
184+
sync_utils.batched(table.insert().values, session=self.session, data=data, max_parameters=250)
185+
else:
186+
self.copy_into(from_=stage, into=tablename)
181187

182188
if self.load_strategy == "TRUNCATE":
183189
self.session.execute(table.delete())
184-
self.copy_into(from_=stage, into=tablename)
190+
191+
if self.use_legacy_dataload:
192+
sync_utils.batched(table.insert().values, session=self.session, data=data, max_parameters=250)
193+
else:
194+
self.copy_into(from_=stage, into=tablename)
185195

186196
if self.load_strategy == "UPSERT":
187-
with self.temporary_table(table=table) as temp_table:
188-
self.copy_into(from_=stage, into=temp_table.name)
189-
self.merge_into(from_=temp_table.name, into=table)
197+
if self.use_legacy_dataload:
198+
sync_utils.generic_upsert(table, session=self.session, data=data, max_params=250)
199+
else:
200+
with self.temporary_table(table=table) as temp_table:
201+
self.copy_into(from_=stage, into=temp_table.name)
202+
self.merge_into(from_=temp_table.name, into=table)

0 commit comments

Comments
 (0)