@@ -231,6 +231,28 @@ def _warn(msg: str):
231231 _warn ._LOGGER .warning (msg ) # pyright: ignore[reportFunctionMemberAccess]
232232
233233
234+ def _force_flush_traces ():
235+ try :
236+ import opentelemetry .trace
237+ except (ImportError , AttributeError ):
238+ _warn (
239+ "Could not force flush traces. opentelemetry-api is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
240+ )
241+ return None
242+
243+ try :
244+ import opentelemetry .sdk .trace
245+ except (ImportError , AttributeError ):
246+ _warn (
247+ "Could not force flush traces. opentelemetry-sdk is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
248+ )
249+ return None
250+
251+ provider = opentelemetry .trace .get_tracer_provider ()
252+ if isinstance (provider , opentelemetry .sdk .trace .TracerProvider ):
253+ _ = provider .force_flush ()
254+
255+
234256def _default_instrumentor_builder (
235257 project_id : str ,
236258 * ,
@@ -311,28 +333,23 @@ def _detect_cloud_resource_id(project_id: str) -> Optional[str]:
311333
312334 if enable_tracing :
313335 try :
314- import opentelemetry .exporter .cloud_trace
336+ import opentelemetry .exporter .otlp .proto .http .trace_exporter
337+ import google .auth .transport .requests
315338 except (ImportError , AttributeError ):
316339 return _warn_missing_dependency (
317- "opentelemetry-exporter-gcp-trace" , needed_for_tracing = True
318- )
319-
320- try :
321- import google .cloud .trace_v2
322- except (ImportError , AttributeError ):
323- return _warn_missing_dependency (
324- "google-cloud-trace" , needed_for_tracing = True
340+ "opentelemetry-exporter-otlp-proto-http" , needed_for_tracing = True
325341 )
326342
327343 import google .auth
328344
329345 credentials , _ = google .auth .default ()
330- span_exporter = opentelemetry .exporter .cloud_trace .CloudTraceSpanExporter (
331- project_id = project_id ,
332- client = google .cloud .trace_v2 .TraceServiceClient (
333- credentials = credentials .with_quota_project (project_id ),
334- ),
335- resource_regex = "|" .join (resource .attributes .keys ()),
346+ span_exporter = (
347+ opentelemetry .exporter .otlp .proto .http .trace_exporter .OTLPSpanExporter (
348+ session = google .auth .transport .requests .AuthorizedSession (
349+ credentials = credentials
350+ ),
351+ endpoint = "https://telemetry.googleapis.com/v1/traces" ,
352+ )
336353 )
337354 span_processor = opentelemetry .sdk .trace .export .BatchSpanProcessor (
338355 span_exporter = span_exporter ,
@@ -695,54 +712,17 @@ def set_up(self):
695712 else :
696713 os .environ ["ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS" ] = "false"
697714
698- GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
699- "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
700- )
701-
702- def telemetry_enabled () -> Optional [bool ]:
703- return (
704- os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
705- in ("true" , "1" )
706- if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
707- else None
708- )
709-
710- # Tracing enablement follows truth table:
711- def tracing_enabled () -> bool :
712- """Tracing enablement follows true table:
713-
714- | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
715- |----------------|-----------------------|--------------------------|
716- | false | false | false |
717- | false | true | false |
718- | false | None | false |
719- | true | false | false |
720- | true | true | true |
721- | true | None | true |
722- | None(default) | false | false |
723- | None(default) | true | adk_version >= 1.17 |
724- | None(default) | None | false |
725- """
726- enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
727- enable_telemetry : Optional [bool ] = telemetry_enabled ()
728-
729- return (enable_tracing is True and enable_telemetry is not False ) or (
730- enable_tracing is None
731- and enable_telemetry is True
732- and is_version_sufficient ("1.17.0" )
733- )
734-
735- enable_logging = bool (telemetry_enabled ())
715+ enable_logging = bool (self ._telemetry_enabled ())
736716
737717 custom_instrumentor = self ._tmpl_attrs .get ("instrumentor_builder" )
738718
739- if custom_instrumentor and tracing_enabled ():
719+ if custom_instrumentor and self . _tracing_enabled ():
740720 self ._tmpl_attrs ["instrumentor" ] = custom_instrumentor (project )
741721
742722 if not custom_instrumentor :
743723 self ._tmpl_attrs ["instrumentor" ] = _default_instrumentor_builder (
744724 project ,
745- enable_tracing = tracing_enabled (),
725+ enable_tracing = self . _tracing_enabled (),
746726 enable_logging = enable_logging ,
747727 )
748728
@@ -914,9 +894,14 @@ async def async_stream_query(
914894 ** kwargs ,
915895 )
916896
917- async for event in events_async :
918- # Yield the event data as a dictionary
919- yield _utils .dump_event_for_json (event )
897+ try :
898+ async for event in events_async :
899+ # Yield the event data as a dictionary
900+ yield _utils .dump_event_for_json (event )
901+ finally :
902+ # Avoid trace data loss having to do with CPU throttling on instance turndown
903+ if self ._tracing_enabled ():
904+ _ = await asyncio .to_thread (_force_flush_traces )
920905
921906 def stream_query (
922907 self ,
@@ -1068,6 +1053,9 @@ async def streaming_agent_run_with_events(self, request_json: str):
10681053 user_id = request .user_id ,
10691054 session_id = session .id ,
10701055 )
1056+ # Avoid trace data loss having to do with CPU throttling on instance turndown
1057+ if self ._tracing_enabled ():
1058+ _ = await asyncio .to_thread (_force_flush_traces )
10711059
10721060 async def async_get_session (
10731061 self ,
@@ -1450,3 +1438,52 @@ def register_operations(self) -> Dict[str, List[str]]:
14501438 "streaming_agent_run_with_events" ,
14511439 ],
14521440 }
1441+
1442+ def _telemetry_enabled (self ) -> Optional [bool ]:
1443+ """Return status of telemetry enablement depending on enablement env variable.
1444+
1445+ In detail:
1446+ - Logging is always enabled when telemetry is enabled.
1447+ - Tracing is enabled depending on the truth table seen in `_tracing_enabled` method, in order to not break existing user enablement.
1448+
1449+ Returns:
1450+ True if telemetry is enabled, False if telemetry is disabled, or None
1451+ if telemetry enablement is not set (i.e. old deployments which don't support this env variable).
1452+ """
1453+ import os
1454+
1455+ GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
1456+ "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
1457+ )
1458+
1459+ return (
1460+ os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
1461+ in ("true" , "1" )
1462+ if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
1463+ else None
1464+ )
1465+
1466+ # Tracing enablement follows truth table:
1467+ def _tracing_enabled (self ) -> bool :
1468+ """Tracing enablement follows true table:
1469+
1470+ | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
1471+ |----------------|-----------------------|--------------------------|
1472+ | false | false | false |
1473+ | false | true | false |
1474+ | false | None | false |
1475+ | true | false | false |
1476+ | true | true | true |
1477+ | true | None | true |
1478+ | None(default) | false | false |
1479+ | None(default) | true | adk_version >= 1.17 |
1480+ | None(default) | None | false |
1481+ """
1482+ enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
1483+ enable_telemetry : Optional [bool ] = self ._telemetry_enabled ()
1484+
1485+ return (enable_tracing is True and enable_telemetry is not False ) or (
1486+ enable_tracing is None
1487+ and enable_telemetry is True
1488+ and is_version_sufficient ("1.17.0" )
1489+ )
0 commit comments