-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Feat/support streaming intermediate results with non live #4170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/support streaming intermediate results with non live #4170
Conversation
…un_async method with streaming mode.
…un_async method with streaming mode.
…-with-stream' into feat/generative-function-calling-with-stream # Conflicts: # src/google/adk/agents/readonly_context.py # src/google/adk/flows/llm_flows/functions.py # src/google/adk/tools/_function_parameter_parse_util.py # src/google/adk/tools/function_tool.py # tests/unittests/testing_utils.py
Summary of ChangesHello @Lin-Nikaido, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the LLM flow by enabling the streaming of intermediate results from tool executions. This allows long-running tools to provide real-time progress updates, improving the user experience for interactive agents. The changes involve updating the core function calling logic to process generator and async generator outputs as events, ensuring that each yielded value is propagated when operating in Server-Sent Events (SSE) streaming mode. This makes the execution of complex tools more transparent and responsive to the end-user. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable feature to support streaming intermediate results from tools that are generators, which is particularly useful for long-running tasks. The changes are well-implemented across the LLM flow and tool handling logic, correctly distinguishing between streaming and non-streaming modes. The addition of a new, comprehensive test file for this feature is also a great contribution. I've identified a few opportunities for refactoring to reduce code duplication, which would enhance the maintainability of the new logic.
| if inspect.isasyncgen(function_response) or isinstance( | ||
| function_response, AsyncIterator | ||
| ): | ||
| res = None | ||
| async for res in function_response: | ||
| if inspect.isawaitable(res): | ||
| res = await res | ||
| if ( | ||
| invocation_context.run_config.streaming_mode | ||
| == StreamingMode.SSE | ||
| ): | ||
| yield __build_response_event( | ||
| tool, res, tool_context, invocation_context | ||
| ) | ||
| function_response = res | ||
| elif inspect.isgenerator(function_response) or isinstance( | ||
| function_response, Iterator | ||
| ): | ||
| res = None | ||
| for res in function_response: | ||
| if inspect.isawaitable(res): | ||
| res = await res | ||
| if ( | ||
| invocation_context.run_config.streaming_mode | ||
| == StreamingMode.SSE | ||
| ): | ||
| yield __build_response_event( | ||
| tool, res, tool_context, invocation_context | ||
| ) | ||
| function_response = res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if origin in {ABCGenerator, ABCIterator, ABCIterable}: | ||
| schema.type = types.Type.ARRAY | ||
| item_ann = args[0] if args else Any | ||
| schema.items = _parse_schema_from_parameter( | ||
| variant, | ||
| inspect.Parameter( | ||
| 'item', | ||
| inspect.Parameter.POSITIONAL_OR_KEYWORD, | ||
| annotation=item_ann, | ||
| ), | ||
| func_name, | ||
| ) | ||
| if param.default is not inspect.Parameter.empty: | ||
| if not _is_default_value_compatible(param.default, param.annotation): | ||
| raise ValueError(default_value_error_msg) | ||
| schema.default = param.default | ||
| _raise_if_schema_unsupported(variant, schema) | ||
| return schema | ||
| if origin in {ABCAsyncGenerator, ABCAsyncIterator, ABCAsyncIterable}: | ||
| schema.type = types.Type.ARRAY | ||
| item_ann = args[0] if args else Any | ||
| schema.items = _parse_schema_from_parameter( | ||
| variant, | ||
| inspect.Parameter( | ||
| 'item', | ||
| inspect.Parameter.POSITIONAL_OR_KEYWORD, | ||
| annotation=item_ann, | ||
| ), | ||
| func_name, | ||
| ) | ||
| if param.default is not inspect.Parameter.empty: | ||
| if not _is_default_value_compatible(param.default, param.annotation): | ||
| raise ValueError(default_value_error_msg) | ||
| schema.default = param.default | ||
| _raise_if_schema_unsupported(variant, schema) | ||
| return schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in these two if blocks for handling synchronous and asynchronous generators is identical. You can combine them into a single block to reduce code duplication and improve maintainability.
if origin in {
ABCGenerator,
ABCIterator,
ABCIterable,
ABCAsyncGenerator,
ABCAsyncIterator,
ABCAsyncIterable,
}:
schema.type = types.Type.ARRAY
item_ann = args[0] if args else Any
schema.items = _parse_schema_from_parameter(
variant,
inspect.Parameter(
'item',
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=item_ann,
),
func_name,
)
if param.default is not inspect.Parameter.empty:
if not _is_default_value_compatible(param.default, param.annotation):
raise ValueError(default_value_error_msg)
schema.default = param.default
_raise_if_schema_unsupported(variant, schema)
return schema| elif is_generator: | ||
| # if streaming_mode: SSE, return as generator object. | ||
| if tool_context.run_config.streaming_mode == StreamingMode.SSE: | ||
| return target(**args_to_call) | ||
|
|
||
| # elif streaming_mode != SSE, return last yields value. | ||
| res = None | ||
| for res in target(**args_to_call): | ||
| if inspect.isawaitable(res): | ||
| res = await res | ||
| return res | ||
|
|
||
| elif is_asyncgen: | ||
| # if streaming_mode: SSE return just async generator object. | ||
| if tool_context.run_config.streaming_mode == StreamingMode.SSE: | ||
| return target(**args_to_call) | ||
|
|
||
| # elif streaming_mode != SSE, return last yields value. | ||
| res = None | ||
| async for res in target(**args_to_call): | ||
| if inspect.isawaitable(res): | ||
| res = await res | ||
| return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Solution:
Enable to will_continue-like function response in BaseLlmFlow.run_async method with streaming mode.
It expected the tool returns generator, and the runner.async_run method when
streaming_mode: StreamingMode.SSEyields the generator result as each Event. also, thestreaming_modeis not SSE there is no change.I expect this usecase is the function-tool will take few minutes total, and the user want to notice user its progress.
e.g. Like this function.
Testing Plan
Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.
Unit Tests:
Please include a summary of passed

pytestresults.Add:
tests/unittests/tools/test_tools_generative_call.pyand it passed.and also, all unit tests pass locally.
Manual End-to-End (E2E) Tests:
Checklist
Additional context
This capability has been strongly requested, and I believe it will significantly enhance the developer experience when building practical, production-grade agents with ADK.
I would appreciate it if you could review and consider this change. If you have any change requests, I will fix it as soon as possible.
Thank you.