Skip to content

Commit 7c993b0

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Schema Enhancements with Descriptions, Partitioning, and Truncation Indicator
This update enhances the BigQuery agent analytics plugin: * **Schema Field Descriptions:** The recommended BigQuery table schema now includes descriptions for each field, improving data understandability. * **Optimized Table Structure:** The plugin now creates the table with daily partitioning on `timestamp` and clustering on `event_type`, `agent`, and `user_id` by default. * **Truncation Flag:** A new boolean field `is_truncated` is added to the schema to show if the `content` was truncated. PiperOrigin-RevId: 832436799
1 parent 696852a commit 7c993b0

File tree

2 files changed

+298
-46
lines changed

2 files changed

+298
-46
lines changed

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 172 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,19 @@ def _get_event_type(event: Event) -> str:
245245

246246
def _format_content(
247247
content: Optional[types.Content], max_len: int = 500
248-
) -> str:
249-
"""Formats an Event content for logging."""
248+
) -> tuple[str, bool]:
249+
"""Formats an Event content for logging.
250+
251+
Args:
252+
content: The Event content to format.
253+
max_len: The maximum length of the text parts before truncation.
254+
255+
Returns:
256+
A tuple containing the formatted content string and a boolean indicating if
257+
the content was truncated.
258+
"""
250259
if not content or not content.parts:
251-
return "None"
260+
return "None", False
252261
parts = []
253262
for p in content.parts:
254263
if p.text:
@@ -263,18 +272,33 @@ def _format_content(
263272
parts.append(f"resp: {p.function_response.name}")
264273
else:
265274
parts.append("other")
266-
return " | ".join(parts)
275+
return " | ".join(parts), any(
276+
len(p.text) > max_len for p in content.parts if p.text
277+
)
267278

268279

269-
def _format_args(args: dict[str, Any], max_len: int = 1000) -> str:
270-
"""Formats tool arguments or results for logging."""
280+
def _format_args(
281+
args: dict[str, Any], *, max_len: int = 1000
282+
) -> tuple[str, bool]:
283+
"""Formats tool arguments or results for logging.
284+
285+
Args:
286+
args: The tool arguments or results dictionary to format.
287+
max_len: The maximum length of the output string before truncation.
288+
289+
Returns:
290+
A tuple containing the JSON formatted string and a boolean indicating if
291+
the content was truncated.
292+
"""
271293
if not args:
272-
return "{}"
294+
return "{}", False
273295
try:
274296
s = json.dumps(args)
275297
except TypeError:
276298
s = str(args)
277-
return s[:max_len] + "..." if len(s) > max_len else s
299+
if len(s) > max_len:
300+
return s[:max_len] + "...", True
301+
return s, False
278302

279303

280304
class BigQueryAgentAnalyticsPlugin(BasePlugin):
@@ -322,29 +346,99 @@ def __init__(
322346
self._background_tasks: set[asyncio.Task] = set()
323347
self._is_shutting_down = False
324348
self._schema = [
325-
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
326-
bigquery.SchemaField("event_type", "STRING", mode="NULLABLE"),
327-
bigquery.SchemaField("agent", "STRING", mode="NULLABLE"),
328-
bigquery.SchemaField("session_id", "STRING", mode="NULLABLE"),
329-
bigquery.SchemaField("invocation_id", "STRING", mode="NULLABLE"),
330-
bigquery.SchemaField("user_id", "STRING", mode="NULLABLE"),
331-
bigquery.SchemaField("content", "STRING", mode="NULLABLE"),
332-
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
349+
bigquery.SchemaField(
350+
"timestamp",
351+
"TIMESTAMP",
352+
mode="REQUIRED",
353+
description="The UTC time at which the event was logged.",
354+
),
355+
bigquery.SchemaField(
356+
"event_type",
357+
"STRING",
358+
mode="NULLABLE",
359+
description=(
360+
"Indicates the type of event being logged (e.g., 'LLM_REQUEST',"
361+
" 'TOOL_COMPLETED')."
362+
),
363+
),
364+
bigquery.SchemaField(
365+
"agent",
366+
"STRING",
367+
mode="NULLABLE",
368+
description=(
369+
"The name of the ADK agent or author associated with the event."
370+
),
371+
),
372+
bigquery.SchemaField(
373+
"session_id",
374+
"STRING",
375+
mode="NULLABLE",
376+
description=(
377+
"A unique identifier to group events within a single"
378+
" conversation or user session."
379+
),
380+
),
381+
bigquery.SchemaField(
382+
"invocation_id",
383+
"STRING",
384+
mode="NULLABLE",
385+
description=(
386+
"A unique identifier for each individual agent execution or"
387+
" turn within a session."
388+
),
389+
),
390+
bigquery.SchemaField(
391+
"user_id",
392+
"STRING",
393+
mode="NULLABLE",
394+
description=(
395+
"The identifier of the user associated with the current"
396+
" session."
397+
),
398+
),
399+
bigquery.SchemaField(
400+
"content",
401+
"STRING",
402+
mode="NULLABLE",
403+
description=(
404+
"The event-specific data (payload). Format varies by"
405+
" event_type."
406+
),
407+
),
408+
bigquery.SchemaField(
409+
"error_message",
410+
"STRING",
411+
mode="NULLABLE",
412+
description=(
413+
"Populated if an error occurs during the processing of the"
414+
" event."
415+
),
416+
),
417+
bigquery.SchemaField(
418+
"is_truncated",
419+
"BOOLEAN",
420+
mode="NULLABLE",
421+
description=(
422+
"Indicates if the content field was truncated due to size"
423+
" limits."
424+
),
425+
),
333426
]
334427

335428
def _format_content_safely(
336429
self, content: Optional[types.Content]
337-
) -> str | None:
430+
) -> tuple[str | None, bool]:
338431
"""Formats content using self._config.content_formatter or _format_content, catching errors."""
339432
if content is None:
340-
return None
433+
return None, False
341434
try:
342435
if self._config.content_formatter:
343-
return self._config.content_formatter(content)
436+
# Custom formatter: we assume no truncation or we can't know.
437+
return self._config.content_formatter(content), False
344438
return _format_content(content, max_len=self._config.max_content_length)
345439
except Exception as e:
346440
logging.warning(f"Content formatter failed: {e}")
347-
return "[FORMATTING FAILED]"
441+
return "[FORMATTING FAILED]", False
348442

349443
async def _ensure_init(self):
350444
"""Ensures BigQuery clients are initialized."""
@@ -375,6 +469,10 @@ def create_resources():
375469
f"{self._project_id}.{self._dataset_id}.{self._table_id}",
376470
schema=self._schema,
377471
)
472+
table.time_partitioning = bigquery.TimePartitioning(
473+
type_="DAY", field="timestamp"
474+
)
475+
table.clustering_fields = ["event_type", "agent", "user_id"]
378476
self._bq_client.create_table(table, exists_ok=True)
379477
logging.info(
380478
"BQ Plugin: Dataset %s and Table %s ensured to exist.",
@@ -462,6 +560,7 @@ async def _log(self, data: dict):
462560
"user_id": None,
463561
"content": None,
464562
"error_message": None,
563+
"is_truncated": False,
465564
}
466565
row.update(data)
467566

@@ -519,13 +618,15 @@ async def on_user_message_callback(
519618
user_message: types.Content,
520619
) -> None:
521620
"""Callback for user messages."""
621+
content, truncated = self._format_content_safely(user_message)
522622
await self._log({
523623
"event_type": "USER_MESSAGE_RECEIVED",
524624
"agent": invocation_context.agent.name,
525625
"session_id": invocation_context.session.id,
526626
"invocation_id": invocation_context.invocation_id,
527627
"user_id": invocation_context.session.user_id,
528-
"content": f"User Content: {self._format_content_safely(user_message)}",
628+
"content": f"User Content: {content}",
629+
"is_truncated": truncated,
529630
})
530631

531632
async def before_run_callback(
@@ -544,15 +645,17 @@ async def on_event_callback(
544645
self, *, invocation_context: InvocationContext, event: Event
545646
) -> None:
546647
"""Callback for agent events."""
648+
content, truncated = self._format_content_safely(event.content)
547649
await self._log({
548650
"event_type": _get_event_type(event),
549651
"agent": event.author,
550652
"session_id": invocation_context.session.id,
551653
"invocation_id": invocation_context.invocation_id,
552654
"user_id": invocation_context.session.user_id,
553-
"content": self._format_content_safely(event.content),
655+
"content": content,
554656
"error_message": event.error_message,
555657
"timestamp": datetime.fromtimestamp(event.timestamp, timezone.utc),
658+
"is_truncated": truncated,
556659
})
557660

558661
async def after_run_callback(
@@ -600,10 +703,15 @@ async def before_model_callback(
600703
content_parts = [
601704
f"Model: {llm_request.model or 'default'}",
602705
]
706+
is_truncated = False
603707
if contents := getattr(llm_request, "contents", None):
604-
prompt_str = " | ".join(
605-
[f"{c.role}: {self._format_content_safely(c)}" for c in contents]
606-
)
708+
prompt_parts = []
709+
for c in contents:
710+
c_str, c_trunc = self._format_content_safely(c)
711+
prompt_parts.append(f"{c.role}: {c_str}")
712+
if c_trunc:
713+
is_truncated = True
714+
prompt_str = " | ".join(prompt_parts)
607715
content_parts.append(f"Prompt: {prompt_str}")
608716
system_instruction_text = "None"
609717
if llm_request.config and llm_request.config.system_instruction:
@@ -656,13 +764,15 @@ async def before_model_callback(
656764
max_len = self._config.max_content_length
657765
if len(final_content) > max_len:
658766
final_content = final_content[:max_len] + "..."
767+
is_truncated = True
659768
await self._log({
660769
"event_type": "LLM_REQUEST",
661770
"agent": callback_context.agent_name,
662771
"session_id": callback_context.session.id,
663772
"invocation_id": callback_context.invocation_id,
664773
"user_id": callback_context.session.user_id,
665774
"content": final_content,
775+
"is_truncated": is_truncated,
666776
})
667777

668778
async def after_model_callback(
@@ -672,6 +782,7 @@ async def after_model_callback(
672782
content_parts = []
673783
content = llm_response.content
674784
is_tool_call = False
785+
is_truncated = False
675786
if content and content.parts:
676787
is_tool_call = any(part.function_call for part in content.parts)
677788

@@ -685,8 +796,12 @@ async def after_model_callback(
685796
]
686797
content_parts.append(f"Tool Name: {', '.join(fc_names)}")
687798
else:
688-
text_content = self._format_content_safely(llm_response.content)
799+
text_content, truncated = self._format_content_safely(
800+
llm_response.content
801+
)
689802
content_parts.append(f"Tool Name: text_response, {text_content}")
803+
if truncated:
804+
is_truncated = True
690805

691806
if llm_response.usage_metadata:
692807
prompt_tokens = getattr(
@@ -713,6 +828,7 @@ async def after_model_callback(
713828
"user_id": callback_context.session.user_id,
714829
"content": final_content,
715830
"error_message": llm_response.error_message,
831+
"is_truncated": is_truncated,
716832
})
717833

718834
async def before_tool_callback(
@@ -723,17 +839,24 @@ async def before_tool_callback(
723839
tool_context: ToolContext,
724840
) -> None:
725841
"""Callback before tool call."""
842+
args_str, truncated = _format_args(
843+
tool_args, max_len=self._config.max_content_length
844+
)
845+
content = (
846+
f"Tool Name: {tool.name}, Description: {tool.description},"
847+
f" Arguments: {args_str}"
848+
)
849+
if len(content) > self._config.max_content_length:
850+
content = content[: self._config.max_content_length] + "..."
851+
truncated = True
726852
await self._log({
727853
"event_type": "TOOL_STARTING",
728854
"agent": tool_context.agent_name,
729855
"session_id": tool_context.session.id,
730856
"invocation_id": tool_context.invocation_id,
731857
"user_id": tool_context.session.user_id,
732-
"content": (
733-
f"Tool Name: {tool.name}, Description: {tool.description},"
734-
" Arguments:"
735-
f" {_format_args(tool_args, max_len=self._config.max_content_length)}"
736-
),
858+
"content": content,
859+
"is_truncated": truncated,
737860
})
738861

739862
async def after_tool_callback(
@@ -745,16 +868,21 @@ async def after_tool_callback(
745868
result: dict[str, Any],
746869
) -> None:
747870
"""Callback after tool call."""
871+
result_str, truncated = _format_args(
872+
result, max_len=self._config.max_content_length
873+
)
874+
content = f"Tool Name: {tool.name}, Result: {result_str}"
875+
if len(content) > self._config.max_content_length:
876+
content = content[: self._config.max_content_length] + "..."
877+
truncated = True
748878
await self._log({
749879
"event_type": "TOOL_COMPLETED",
750880
"agent": tool_context.agent_name,
751881
"session_id": tool_context.session.id,
752882
"invocation_id": tool_context.invocation_id,
753883
"user_id": tool_context.session.user_id,
754-
"content": (
755-
f"Tool Name: {tool.name}, Result:"
756-
f" {_format_args(result, max_len=self._config.max_content_length)}"
757-
),
884+
"content": content,
885+
"is_truncated": truncated,
758886
})
759887

760888
async def on_model_error_callback(
@@ -783,15 +911,20 @@ async def on_tool_error_callback(
783911
error: Exception,
784912
) -> None:
785913
"""Callback for tool errors."""
914+
args_str, truncated = _format_args(
915+
tool_args, max_len=self._config.max_content_length
916+
)
917+
content = f"Tool Name: {tool.name}, Arguments: {args_str}"
918+
if len(content) > self._config.max_content_length:
919+
content = content[: self._config.max_content_length] + "..."
920+
truncated = True
786921
await self._log({
787922
"event_type": "TOOL_ERROR",
788923
"agent": tool_context.agent_name,
789924
"session_id": tool_context.session.id,
790925
"invocation_id": tool_context.invocation_id,
791926
"user_id": tool_context.session.user_id,
792-
"content": (
793-
f"Tool Name: {tool.name}, Arguments:"
794-
f" {_format_args(tool_args, max_len=self._config.max_content_length)}"
795-
),
927+
"content": content,
796928
"error_message": str(error),
929+
"is_truncated": truncated,
797930
})

0 commit comments

Comments
 (0)