Add UIWorker#4540
Conversation
36f3b72 to
e8ec7c5
Compare
e7ce598 to
9054912
Compare
6c98f7e to
1e5df7c
Compare
| @@ -0,0 +1,178 @@ | |||
| # | |||
There was a problem hiding this comment.
Should this go into bus/ui/messages.py? following the workers pattern
There was a problem hiding this comment.
Good idea. Moving.
|
|
||
|
|
||
| @dataclass | ||
| class BusUITaskGroupStartedMessage(BusDataMessage): |
There was a problem hiding this comment.
This should be called XXXJobGroupXXX I think, same for the ones below.
There was a problem hiding this comment.
Updated all terms to align to Worker and JobGroup
| @@ -52,6 +63,16 @@ | |||
| from pipecat.pipeline.worker_observer import WorkerObserver | |||
| from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup | |||
| from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor | |||
| from pipecat.processors.frameworks.rtvi.frames import RTVIUICommandFrame, RTVIUITaskFrame | |||
| from pipecat.processors.frameworks.rtvi.models import ( | |||
| UICancelTaskMessage, | |||
There was a problem hiding this comment.
UICancelWorkerMessage I think
There was a problem hiding this comment.
Updated to UICancelJobGroupMessage.
| event_name=event_name, | ||
| payload=payload, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Maybe a small private function:
@self.rtvi.event_handler("on_ui_message")
async def on_ui_message(rtvi: RTVIProcessor, message):
await self._send_rtvi_bus_message(message)
| """Decorator for marking worker methods as UI event handlers.""" | ||
|
|
||
|
|
||
| def on_ui_event(name: str): |
There was a problem hiding this comment.
should this be @ui_event? we usually only use on_XXX for event handler. for decorators we have @job instead of @on_job_request
There was a problem hiding this comment.
Good catch. Done.
Addresses PR #4540 feedback: the inbound RTVI-UI -> bus translation was a ~24-line closure inside PipelineWorker.__init__. Move it to a typed private method, leaving the event handler a one-line delegate. Reads as the inbound counterpart to on_bus_message (the outbound bus -> RTVI translation). Behavior unchanged. Named _republish_ui_message_on_bus rather than the suggested _send_rtvi_bus_message: it matches the comment's verb ('republish ... onto the bus') and is less ambiguous (it sends a BusUIEventMessage derived from the RTVI message, not an 'RTVI bus message').
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
Addresses PR #4540 feedback: the inbound RTVI-UI -> bus translation was a ~24-line closure inside PipelineWorker.__init__. Move it to a typed private method, leaving the event handler a one-line delegate. Reads as the inbound counterpart to on_bus_message (the outbound bus -> RTVI translation). Behavior unchanged. Named _republish_ui_message_on_bus rather than the suggested _send_rtvi_bus_message: it matches the comment's verb ('republish ... onto the bus') and is less ambiguous (it sends a BusUIEventMessage derived from the RTVI message, not an 'RTVI bus message').
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
Addresses PR #4540 feedback: the inbound RTVI-UI -> bus translation was a ~24-line closure inside PipelineWorker.__init__. Move it to a typed private method, leaving the event handler a one-line delegate. Reads as the inbound counterpart to on_bus_message (the outbound bus -> RTVI translation). Behavior unchanged. Named _republish_ui_message_on_bus rather than the suggested _send_rtvi_bus_message: it matches the comment's verb ('republish ... onto the bus') and is less ambiguous (it sends a BusUIEventMessage derived from the RTVI message, not an 'RTVI bus message').
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
Addresses PR #4540 feedback: the inbound RTVI-UI -> bus translation was a ~24-line closure inside PipelineWorker.__init__. Move it to a typed private method, leaving the event handler a one-line delegate. Reads as the inbound counterpart to on_bus_message (the outbound bus -> RTVI translation). Behavior unchanged. Named _republish_ui_message_on_bus rather than the suggested _send_rtvi_bus_message: it matches the comment's verb ('republish ... onto the bus') and is less ambiguous (it sends a BusUIEventMessage derived from the RTVI message, not an 'RTVI bus message').
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
Addresses PR #4540 feedback: the inbound RTVI-UI -> bus translation was a ~24-line closure inside PipelineWorker.__init__. Move it to a typed private method, leaving the event handler a one-line delegate. Reads as the inbound counterpart to on_bus_message (the outbound bus -> RTVI translation). Behavior unchanged. Named _republish_ui_message_on_bus rather than the suggested _send_rtvi_bus_message: it matches the comment's verb ('republish ... onto the bus') and is less ambiguous (it sends a BusUIEventMessage derived from the RTVI message, not an 'RTVI bus message').
Addresses PR #4540 feedback. Pipecat reserves the on_XXX form for event-handler names (the string passed to event_handler(...)); decorators that mark methods are named after the thing they handle -- @job (not @on_job_request), @tool, etc. @on_ui_event mixed the two, so it becomes @ui_event("nav_click"), paralleling @job(name="respond"). Updated the decorator, its export, docstrings, the duplicate-handler error message, tests, the document-review example, and the changelog fragment. Internal markers (is_ui_event_handler / ui_event_name) are unchanged.
|
|
||
|
|
||
| @dataclass | ||
| class BusUIEventMessage(BusDataMessage): |
There was a problem hiding this comment.
It might be a good idea to have a BusUIDataMessage(BusDataMessage) and all the UI messages inherit from it. There's a small reason in another comment.
| if message.target and message.target != self.name: | ||
| return | ||
|
|
||
| if isinstance(message, BusTTSSpeakMessage): | ||
| await self.queue_frame( | ||
| TTSSpeakFrame(text=message.text, append_to_context=message.append_to_context) | ||
| ) | ||
| return | ||
|
|
||
| if self._rtvi is None: |
There was a problem hiding this comment.
if we add BusUIDataMessage we could do:
if self._rtvi and isinstance(message, BusUIDataMessage):
await self._handle_ui_bus_message(message)
|
|
||
| Args: | ||
| message: The bus message to process. | ||
| """Handle bus messages for outbound RTVI UI messages. |
There was a problem hiding this comment.
This function can do more than just RTVI. I'd say the previous line was more generic.
| active: Whether the worker starts active. Defaults to ``True`` | ||
| (vs. ``False`` on ``LLMWorker`` / ``LLMContextWorker``) since a | ||
| UIWorker is typically an always-on delegate. Pass ``False`` for | ||
| handoff use cases. |
There was a problem hiding this comment.
I'm wondering if we really need active in UIWorker. This will always be a completely separate LLM that handles UI, plus the user can't enable the bridge. Active usually only makes sense when you have a bridge right now. So I'd suggest to remove it and just set it to True.
| response = {"answer": answer} if answer else None | ||
| pending.set_result({"response": response, "status": status}) | ||
|
|
||
| def user_job_group( |
There was a problem hiding this comment.
Should this be called ui_job_group? Also, UIJobGroupContext. All the new stuff is called UI.
| cancellable=cancellable, | ||
| ) | ||
|
|
||
| async def start_user_job_group( |
| ) | ||
| return job_id | ||
|
|
||
| def _register_user_job_group( |
There was a problem hiding this comment.
_register_ui_job_group()
| cancellable=cancellable, | ||
| ) | ||
|
|
||
| def _unregister_user_job_group(self, job_id: str) -> None: |
There was a problem hiding this comment.
_unregister_ui_job_group
Align the job-group API with the rest of the UI surface (UIWorker, BusUI*, UICommand) per review feedback -- user -> ui: - ui_job_group / start_ui_job_group / _register_ui_job_group / _unregister_ui_job_group - UIJobGroupContext (was UserJobGroupContext) - internals _UIJobGroupRegistration and the _ui_job_groups dict Propagated to the async-tasks and document-review examples, the job-group lifecycle tests, and the changelog.
A UIWorker is always a standalone, job-dispatched delegate -- it is never bridged into the voice pipeline where activate/deactivate (handoff) applies -- so active is vestigial. Hardcode active=True in the super() call and drop the param. Updates the __init__ Args + class docstring, the hello-snapshot worker docstring (referenced active=True), and the test call sites that passed active=False purely for isolation (inert -- all UI tests still pass).
…iew) Introduce BusUIDataMessage(BusDataMessage) and reparent all six UI carriers (BusUIEventMessage, BusUICommandMessage, the four BusUIJob*) to it. PipelineWorker.on_bus_message now dispatches on the base with a single isinstance check and delegates the RTVI translation to a new _handle_ui_bus_message; the method's docstring is generalized since it also handles BusTTSSpeakMessage (not just RTVI). Inbound carriers match no translation branch and remain a no-op.
138ef6b to
ed385dc
Compare
Helper to append text to an LLM service's system instruction. Used by UIWorker to append its wire-format prompt guide, but generally reusable.
UIWorker is an LLMContextWorker that observes and drives a client GUI over the RTVI UI channel: it stores live accessibility snapshots, auto-injects <ui_state> into the LLM context before each inference, dispatches client events to @ui_event handlers, and sends UI commands (scroll_to, highlight, select_text, click, set_input_value) back to the client. - Bus carriers (bus/ui/): a BusUIDataMessage base plus the UI event, command, and job-group lifecycle messages that UIWorker and PipelineWorker exchange. They live in the bus layer because both pipeline and workers reference them. - RTVI UI protocol: PipelineWorker connects a UIWorker to the client whenever RTVI is enabled -- its on_ui_message handler republishes inbound client messages onto the bus, and on_bus_message translates outbound UI carriers into RTVI frames (rtvi models/frames/observer/processor). - Delegate API: a single "respond" job; the worker chooses delivery when it completes the turn with respond_to_job -- data for the requester's voice LLM to phrase, or tts_speak=True for verbatim TTS via the requester's TTS. ReplyToolMixin bundles the standard reply tool; ui_job_group / start_ui_job_group fan work out to peer workers as cancellable job-group cards. Includes UI_STATE_PROMPT_GUIDE, the @ui_event decorator, tests, and changelog.
Seven multi-worker examples demonstrating UIWorker over the RTVI UI channel: pointing, deixis, document-review, form-fill, hello-snapshot, async-tasks, and shopping-list. Each pairs a voice pipeline with a UIWorker on its own worker.
Please describe the changes in your PR. If it is addressing an issue, please reference that as well.