Description
Frames are getting reordered in the Pipeline, which breaks interruptions and the guarantees of event ordering.
Specifically, TranscriptionFrame
s being moved to AFTER StopInterruptionFrame
, causing subsequently generated TranscriptionFrame
s to generated repeated context generations without interruption, stacking replies that behave mostly generate the same response, causing the bot to repeat a slightly different incantation over and over again.
I've made a deterministic was (on my machine at least) to recreate this:
Using this FrameWrapper
frame processor:
import asyncio
from typing import List, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from loguru import logger
class FrameWrapper(FrameProcessor):
"""
When it sees a given set of frames, it will optionally first emit pre-frames, and optionally additionally emit post-frames
which will all be pushed in the same direction as the target frame
"""
def __init__(self, pre_frames: List[Type[Frame]], target_frames: List[Type[Frame]], post_frames: List[Type[
Frame]], pre_frame_delay = 0.1, target_frame_delay = 0.0, post_frame_delay = 0.1):
logger.debug(f"initializing FrameWrapper with pre_frames={pre_frames} target_frames={target_frames} post_frames={post_frames}")
super().__init__()
self.pre_frames = pre_frames
self.target_frames = target_frames
self.post_frames = post_frames
self.pre_frame_delay = pre_frame_delay
self.target_frame_delay = target_frame_delay
self.post_frame_delay = post_frame_delay
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if any(isinstance(frame, rule) for rule in self.target_frames):
# we matched, drop it
for pre_frame in self.pre_frames:
logger.debug(f"pushing pre frame {pre_frame} for target frame {frame}")
await self.push_frame(pre_frame(), direction)
# HACK SLEEP TO MAKE SURE THE ORIGINAL GOES AFTER (frames were getting reordered)
await asyncio.sleep(self.pre_frame_delay)
# push the frame
if self.target_frame_delay > 0:
await asyncio.sleep(self.target_frame_delay)
logger.debug(f"pushing original frame {frame}")
await self.push_frame(frame, direction)
# HACK SLEEP TO MAKE SURE THE ORIGINAL GOES FIRST (frames were getting reordered)
await asyncio.sleep(self.post_frame_delay)
for post_frame in self.post_frames:
logger.debug(f"pushing post frame {post_frame} for target frame {frame}")
await self.push_frame(post_frame(), direction)
return
await self.push_frame(frame, direction)
And this SimpleFrameLogger
processor:
from typing import List, Type, Callable, Any
from pipecat.frames.frames import Frame, EndFrame, EndTaskFrame, CancelTaskFrame, CancelFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from loguru import logger
class SimpleFrameLogger(FrameProcessor):
"""
Simply logs when it gets a certain frame
"""
def __init__(self, id: str, frame_types: List[Type[Frame]], cb: Callable[[Frame, FrameDirection], Any] | None = None):
super().__init__()
self.frame_types = frame_types
self._id = id
self.callback = cb
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if any(isinstance(frame, frame_type) for frame_type in self.frame_types):
logger.debug("{} Got frame {} {}", self._id, frame, direction)
if self.callback is not None:
self.callback(frame, direction)
await self.push_frame(frame, direction)
And this FrameDropper
processor:
from typing import List, Type
from pipecat.frames.frames import Frame, TextFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from abc import abstractmethod
from loguru import logger
class FrameDropper(FrameProcessor):
"""
Will drop text frames for any rule that matches
"""
def __init__(self, rules: List[Type[Frame]]):
logger.debug(f"initializing FrameDropper with rules {rules}")
super().__init__()
self.rules = rules
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if any(isinstance(frame, rule) for rule in self.rules):
# we matched, drop it
logger.debug(f"dropping frame {frame} due to match")
return
await self.push_frame(frame, direction)
What I set up is I drop all frames but the TranscriptionFrame
so I fully control when interrupts are sent, the pipeline has these components:
transport.input(),
FrameDropper([UserStartedSpeakingFrame, StartInterruptionFrame,
EmulateUserStartedSpeakingFrame]),
FrameDropper([UserStoppedSpeakingFrame,
StopInterruptionFrame, EmulateUserStoppedSpeakingFrame]),
stt,
FrameWrapper(
[StartInterruptionFrame, UserStartedSpeakingFrame, EmulateUserStartedSpeakingFrame],
[TranscriptionFrame],
[StopInterruptionFrame, UserStoppedSpeakingFrame, EmulateUserStoppedSpeakingFrame],
),
SimpleFrameLogger("wrapped", [StartInterruptionFrame, UserStartedSpeakingFrame,
StopInterruptionFrame, UserStoppedSpeakingFrame,
InterimTranscriptionFrame, TranscriptionFrame, EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame]),
note you could probably just disable the VAD, but this way is a direct modification of the examples I've been encouraged to test against
When setting all the delays in the FrameWrapper to 0, i see this behavior:
As you can see, the FrameWrapper
emits logs in the order we expect:
2025-03-01 10:09:02.365 | DEBUG | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.StartInterruptionFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.366 | DEBUG | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.UserStartedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.366 | DEBUG | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.EmulateUserStartedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG | frame_wrapper:process_frame:30 - pushing original frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.StopInterruptionFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.UserStoppedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.368 | DEBUG | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.EmulateUserStoppedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
However in the SimpleFramerLogger
processor, the processor immediately after the frame processor that generates those frames moves the transcription frame to after the stop frames:
2025-03-01 10:09:02.366 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame StartInterruptionFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.366 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame UserStartedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame EmulateUserStartedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame StopInterruptionFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame UserStoppedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.368 | DEBUG | simple_frame_logger:process_frame:23 - wrapped Got frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00) FrameDirection.DOWNSTREAM
It didn't move it just by one, it moved it by two positions.
If I set the target_frame_delay
to even 0.01
(10 milliseconds), the ordering is fixed.
This happens 100% of the time on my laptop testing. This causes interrupts to completely fail in a cascading way that creates an unimaginably bad experience as the bot repeats itself over and over again if the user generated more than a single TranscriptionFrame
. I've seen this generate upwards of 4 repeats of a long response in production.
TBH I'm flabbergasted that nobody has encountered this earlier, considering we were hitting this scenario about 10% of the time in our simulation testing. I cannot imagine that we have the more sophisticated implementation of Pipecat, nor the largest scale, but maybe nobody's inspecting/reviewing calls and simulation testing like we are.
We've had to completely disable the VAD and use our own managed interruptions around STT frames and stop speaking timeouts because of this to ensure correct ordering. Since managing our own start/stop speaking frames with small delays between to ensure ordering, we've not seen the repeating behavior since.
I think it would be easy to set up automated testing for this as well, since I was able to get it to run entirely deterministically (at least across 20+ runs). Maybe this is an artifact of the async runtime, I haven't looked into how Python's works, but the reordering seems a bit extreme (2 places, not just 1) to be just a runtime issue.
Activity