Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cs_tools/__project__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.6.4"
__version__ = "1.6.5"
__docs__ = "https://thoughtspot.github.io/cs_tools/"
__repo__ = "https://github.com/thoughtspot/cs_tools"
__help__ = f"{__repo__}/discussions/"
Expand Down
6 changes: 6 additions & 0 deletions cs_tools/cli/commands/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def create(
None, help="the password you type on the ThoughtSpot login screen, use [b magenta]prompt[/] to type it hidden"
),
secret: str = typer.Option(None, help="the trusted authentication secret key, found in the developer tab"),
concurrency: int = typer.Option(None, help="change the number call sending to TS, By default 15"),
token: str = typer.Option(None, help="the V2 API bearer token"),
default_org: int = typer.Option(None, help="org ID to sign into by default"),
temp_dir: custom_types.Directory = typer.Option(None, help="the temporary directory to use for uploading files"),
Expand Down Expand Up @@ -80,6 +81,7 @@ def create(
"default_org": default_org,
"disable_ssl": disable_ssl,
"proxy": proxy,
"concurrency": concurrency,
},
"verbose": verbose,
"temp_dir": temp_dir or cs_tools_venv.subdir(".tmp"),
Expand Down Expand Up @@ -117,6 +119,7 @@ def modify(
),
secret: str = typer.Option(None, help="the trusted authentication secret key"),
token: str = typer.Option(None, help="the V2 API bearer token"),
concurrency: int = typer.Option(None, help="change the number call sending to TS, By default 15"),
temp_dir: custom_types.Directory = typer.Option(None, help="the temporary directory to use for uploading files"),
disable_ssl: bool = typer.Option(
None, "--disable-ssl", help="whether or not to turn off checking the SSL certificate"
Expand Down Expand Up @@ -162,6 +165,9 @@ def modify(
if proxy is not None:
data["thoughtspot"]["proxy"] = proxy

if concurrency is not None:
data["thoughtspot"]["concurrency"] = concurrency

conf = CSToolsConfig.model_validate(data)
conf.save()

Expand Down
78 changes: 78 additions & 0 deletions cs_tools/cli/tools/searchable/api_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,3 +665,81 @@ def ts_audit_logs(data: list[_types.APIResult], *, cluster: _types.GUID) -> _typ
reshaped.sort(key=operator.itemgetter(*CLUSTER_KEY))

return reshaped


def ts_ai_stats(data: list[_types.APIResult], *, cluster: _types.GUID) -> _types.TableRowsFormat:
"""Reshapes /searchdata -> searchable.models.BIServer."""
reshaped: _types.TableRowsFormat = []

PARTITION_KEY = ft.partial(lambda r: r["ThoughtSpot Start Time"].date())
CLUSTER_KEY = ("ThoughtSpot Start Time", "User ID", "Visualization ID")

# KEEP TRACK OF DUPLICATE ROWS DUE TO DATA MANAGEMENT ISSUES.
seen: set[str] = set()

# ENSURE ALL DATA IS IN UTC PRIOR TO GENERATING ROW_NUMBERS.
data = [{**row, "ThoughtSpot Start Time": validators.ensure_datetime_is_utc.func(row["ThoughtSpot Start Time"])} for row in data]

# SORT PRIOR TO GROUP BY SO WE MAINTAIN CLUSTERING KEY SEMANTICS
data.sort(key=operator.itemgetter(*CLUSTER_KEY))

for row_date, rows in it.groupby(data, key=PARTITION_KEY):
# MANUAL ENUMERATION BECAUSE WE NEED TO ACCOUNT FOR DEDUPLICATION.
row_number = 0

for row in rows:
if (unique := f"{row['ThoughtSpot Start Time']}-{row['User ID']}-{row['Visualization ID']}") in seen:
continue

row_number += 1

reshaped.append(
models.AIStats.validated_init(
**{
"cluster_guid": cluster,
"sk_dummy": f"{cluster}-{row_date}-{row_number}",
"answer_session_id" : row["Answer Session ID"],
"query_latency" : row["Average Query Latency (External)"],
"system_latency" : row["Average System Latency (Overall)"],
"connection" : row["Connection"],
"connection_id" : row["Connection ID"],
"db_auth_type" : row["DB Auth Type"],
"db_type" : row["DB Type"],
"error_message" : row["Error Message"],
"external_database_query_id" : row["External Database Query ID"],
"impressions" : row["Impressions"],
"is_billable" : row["Is Billable"],
"is_system" : row["Is System"],
"model" : row["Model"],
"model_id" : row["Model ID"],
"object" : row["Object"],
"object_id" : row["Object ID"],
"object_subtype" : row["Object Subtype"],
"object_type" : row["Object Type"],
"org" : row["Org"],
"org_id" : row["Org ID"],
"query_count" : row["Query Count"],
"query_end_time" : row["Query End Time"],
"query_errors" : row["Query Errors"],
"query_start_time" : row["Query Start Time"],
"query_status" : row["Query Status"],
"sql_query" : row["SQL Query"],
"thoughtspot_query_id" : row["ThoughtSpot Query ID"],
"thoughtspot_start_time" : row["ThoughtSpot Start Time"],
"credits" : row["Total Credits"],
"nums_rows_fetched" : row["Total Nums Rows Fetched"],
"trace_id" : row["Trace ID"],
"user" : row["User"],
"user_action" : row["User Action"],
"user_action_count" : row["User Action Count"],
"user_count" : row["User Count"],
"user_display_name" : row["User Display Name"],
"user_id" : row["User ID"],
"visualization_id" : row["Visualization ID"],
}
).model_dump()
)

seen.add(unique)

return reshaped
100 changes: 100 additions & 0 deletions cs_tools/cli/tools/searchable/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,3 +773,103 @@ def tml(
syncer.dump(models.MetadataTML.__tablename__, data=rows)

return 0

@app.command()
@depends_on(thoughtspot=ThoughtSpot())
def ts_ai_stats(
ctx: typer.Context,
syncer: Syncer = typer.Option(
...,
click_type=custom_types.Syncer(models=[models.AIStats]),
help="protocol and path for options to pass to the syncer",
rich_help_panel="Syncer Options",
),
from_date: custom_types.Date = typer.Option(..., help="inclusive lower bound of rows to select from TS: BI Server"),
to_date: custom_types.Date = typer.Option(..., help="inclusive upper bound of rows to select from TS: BI Server"),
org_override: str = typer.Option(None, "--org", help="The Org to switch to before performing actions."),
compact: bool = typer.Option(True, "--compact / --full", help="If compact, add [User Action] != {null} 'invalid'"),
) -> _types.ExitCode:
"""
Extract query performance metrics for each query made against an external database

To extract one day of data, set [b cyan]--from-date[/] and [b cyan]--to-date[/] to the same value.
\b
Fields extracted from TS: AI and BI Stats
- Answer Session ID - Average Query Latency (External) - Average System Latency (Overall) - Impressions
- Connection - Connection ID - DB Auth Type - Is System
- DB Type - Error Message - External Database Query ID - Is Billable
- Model - Model ID - Object - Object ID
- Object Subtype - Object Type - Org - Org ID
- Query Count - Query End Time - Query Errors - Query Start Time
- Query Status - SQL Query - ThoughtSpot Query ID - ThoughtSpot Start Time
- Total Credits - Total Nums Rows Fetched - Trace ID - User
- User Action - User Action Count - User Count - User Display Name
- User ID - Visualization ID
"""
assert isinstance(from_date, dt.date), f"Could not coerce from_date '{from_date}' to a date."
assert isinstance(to_date, dt.date), f"Could not coerce to_date '{to_date}' to a date."
ts = ctx.obj.thoughtspot

CLUSTER_UUID = ts.session_context.thoughtspot.cluster_id

TZ_UTC = zoneinfo.ZoneInfo("UTC")
TS_AI_TIMEZONE = TZ_UTC if ts.session_context.thoughtspot.is_cloud else ts.session_context.thoughtspot.timezone
print(f"TS_AI_TIMEZONE -> {TS_AI_TIMEZONE}")

if syncer.protocol == "falcon":
log.error("Falcon Syncer is not supported for TS: AI Server reflection.")
models.AIStats.__table__.drop(syncer.engine)
return 1

if (to_date - from_date) > dt.timedelta(days=31): # type: ignore[operator]
log.warning("Due to how the Search API functions, it's recommended to request no more than 1 month at a time.")

# DEV NOTE: @boonhapus
# As of 9.10.0.cl , TS: BI Server only resides in the Primary Org(0), so switch to it
if ts.session_context.thoughtspot.is_orgs_enabled:
ts.switch_org(org_id=0)

if org_override is not None:
c = workflows.metadata.fetch_one(identifier=org_override, metadata_type="ORG", attr_path="id", http=ts.api)
_ = utils.run_sync(c)
org_override = _

SEARCH_DATA_DATE_FMT = "%m/%d/%Y"
SEARCH_TOKENS = (
"[Query Start Time] [Query Start Time].detailed [Query End Time] [Query End Time].detailed [Org]"
"[Query Status] [Connection] [User] [Nums Rows Fetched] [ThoughtSpot Query ID] [Is Billable] [ThoughtSpot Start Time]"
"[ThoughtSpot Start Time].detailed [User Action] [Is System] [Visualization ID] [External Database Query ID] [Query Latency (External)] "
"[Object] [User ID] [Org ID] [Credits] [Impressions] [Query Count] [Query Errors] [System Latency (Overall)] [User Action Count]"
"[User Action Count] [User Count] [Answer Session ID] [Connection ID] [DB Auth Type] [DB Type] [Error Message] [Model]"
"[Model ID] [Object ID] [Object Subtype] [Object Type] [SQL Query] [User Display Name] [Trace ID]"
"[ThoughtSpot Start Time].detailed [ThoughtSpot Start Time] != 'today'"
# FOR DATA QUALITY PURPOSES
# CONDITIONALS BASED ON CLI OPTIONS OR ENVIRONMENT
+ ("" if not compact else " [user action] != [user action].invalid [user action].{null}")
+ ("" if from_date is None else f" [ThoughtSpot Start Time] >= '{from_date.strftime(SEARCH_DATA_DATE_FMT)}'")
+ ("" if to_date is None else f" [ThoughtSpot Start Time] <= '{to_date.strftime(SEARCH_DATA_DATE_FMT)}'")
+ ("" if not ts.session_context.thoughtspot.is_orgs_enabled else " [org id]")
+ ("" if org_override is None else f" [org id] = {org_override}")
)

TOOL_TASKS = [
px.WorkTask(id="SEARCH", description="Fetching data from ThoughtSpot"),
px.WorkTask(id="CLEAN", description="Transforming API results"),
px.WorkTask(id="DUMP_DATA", description=f"Sending data to {syncer.name}"),
]

# DEV NOTE: @saurabhsingh1608. 09/15/2025
# Currently worksheet name is "TS: AI and BI Stats (Beta)" change it in future as need arise

with px.WorkTracker("Fetching TS: AI and BI Stats", tasks=TOOL_TASKS) as tracker:
with tracker["SEARCH"]:
c = workflows.search(worksheet="TS: AI and BI Stats (Beta)", query=SEARCH_TOKENS, timezone=TS_AI_TIMEZONE, http=ts.api)
_ = utils.run_sync(c)

with tracker["CLEAN"]:
d = api_transformer.ts_ai_stats(data=_, cluster=CLUSTER_UUID)

with tracker["DUMP_DATA"]:
syncer.dump("ts_ai_stats", data=d)

return 0
66 changes: 66 additions & 0 deletions cs_tools/cli/tools/searchable/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,72 @@ def export_reserved_characters_are_escaped(self, query_text: Optional[str]) -> O

return query_text

class AIStats(ValidatedSQLModel, table=True):
__tablename__ = "ts_ai_stats"
cluster_guid: str = Field(primary_key=True)
sk_dummy: str = Field(primary_key=True)
answer_session_id : Optional[str]
query_latency : Optional[int]
system_latency : Optional[int]
connection : Optional[str]
connection_id : Optional[str]
db_auth_type : Optional[str]
db_type : Optional[str]
error_message : Optional[str]
external_database_query_id : Optional[str]
impressions : Optional[int]
is_billable : Optional[bool]
is_system : Optional[bool]
model : Optional[str]
model_id : Optional[str]
object : Optional[str]
object_id : Optional[str]
object_subtype : Optional[str]
object_type : Optional[str]
org : Optional[str]
org_id: int = 0
query_count : Optional[int]
query_end_time : dt.datetime = Field(sa_column = Column(TIMESTAMP))
query_errors : Optional[int]
query_start_time : dt.datetime = Field(sa_column =Column(TIMESTAMP))
query_status : Optional[str]
sql_query : Optional[str] = Field(sa_column = Column(Text, info = {"length_override": "MAX"}))
thoughtspot_query_id :Optional[str]
thoughtspot_start_time : dt.datetime = Field(sa_column =Column(TIMESTAMP))
credits : Optional[int]
nums_rows_fetched : Optional[int]
trace_id : Optional[str]
user : Optional[str]
user_action : Optional[str]
user_action_count : Optional[int]
user_count : Optional[int]
user_display_name : Optional[str]
user_id : Optional[str]
visualization_id : Optional[str]


@pydantic.field_validator("thoughtspot_start_time", mode="before")
@classmethod
def check_valid_utc_datetime(cls, value: Any) -> dt.datetime:
return validators.ensure_datetime_is_utc.func(value)

@pydantic.field_validator("user_action", mode="after")
@classmethod
def ensure_is_case_sensitive_thoughtspot_enum_value(cls, value: Optional[str]) -> Optional[str]:
# Why not Annotated[str, pydantic.StringContraints(to_upper=True)] ?
# sqlmodel#67: https://github.com/tiangolo/sqlmodel/issues/67
return None if value is None else value.upper()

@pydantic.field_serializer("sql_query")
def export_reserved_characters_are_escaped(self, sql_query: Optional[str]) -> Optional[str]:
if sql_query is None:
return sql_query
reserved_characters = ("\\",)

for character in reserved_characters:
sql_query = sql_query.replace(character, f"\\{character}")

return sql_query

METADATA_MODELS = [
Cluster,
Expand Down
Loading