Skip to content

Frame reordering breaks interrupts and causes bot to repeat itself multiple times #1323

Open
@danthegoodman1

Description

@danthegoodman1

Frames are getting reordered in the Pipeline, which breaks interruptions and the guarantees of event ordering.

Specifically, TranscriptionFrames being moved to AFTER StopInterruptionFrame, causing subsequently generated TranscriptionFrames to generated repeated context generations without interruption, stacking replies that behave mostly generate the same response, causing the bot to repeat a slightly different incantation over and over again.

I've made a deterministic was (on my machine at least) to recreate this:

Using this FrameWrapper frame processor:

import asyncio
from typing import List, Type

from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from loguru import logger

class FrameWrapper(FrameProcessor):
    """
    When it sees a given set of frames, it will optionally first emit pre-frames, and optionally additionally emit post-frames
    which will all be pushed in the same direction as the target frame
    """

    def __init__(self, pre_frames: List[Type[Frame]], target_frames: List[Type[Frame]], post_frames: List[Type[
        Frame]], pre_frame_delay = 0.1, target_frame_delay = 0.0, post_frame_delay = 0.1):
        logger.debug(f"initializing FrameWrapper with pre_frames={pre_frames} target_frames={target_frames} post_frames={post_frames}")
        super().__init__()
        self.pre_frames = pre_frames
        self.target_frames = target_frames
        self.post_frames = post_frames
        self.pre_frame_delay = pre_frame_delay
        self.target_frame_delay = target_frame_delay
        self.post_frame_delay = post_frame_delay

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if any(isinstance(frame, rule) for rule in self.target_frames):
            # we matched, drop it
            for pre_frame in self.pre_frames:
                logger.debug(f"pushing pre frame {pre_frame} for target frame {frame}")
                await self.push_frame(pre_frame(), direction)

            # HACK SLEEP TO MAKE SURE THE ORIGINAL GOES AFTER (frames were getting reordered)
            await asyncio.sleep(self.pre_frame_delay)

            # push the frame
            if self.target_frame_delay > 0:
                await asyncio.sleep(self.target_frame_delay)
            logger.debug(f"pushing original frame {frame}")
            await self.push_frame(frame, direction)

            # HACK SLEEP TO MAKE SURE THE ORIGINAL GOES FIRST (frames were getting reordered)
            await asyncio.sleep(self.post_frame_delay)

            for post_frame in self.post_frames:
                logger.debug(f"pushing post frame {post_frame} for target frame {frame}")
                await self.push_frame(post_frame(), direction)
            return

        await self.push_frame(frame, direction)

And this SimpleFrameLogger processor:

from typing import List, Type, Callable, Any

from pipecat.frames.frames import Frame, EndFrame, EndTaskFrame, CancelTaskFrame, CancelFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection

from loguru import logger

class SimpleFrameLogger(FrameProcessor):
    """
    Simply logs when it gets a certain frame
    """

    def __init__(self, id: str, frame_types: List[Type[Frame]], cb: Callable[[Frame, FrameDirection], Any] | None = None):
        super().__init__()
        self.frame_types = frame_types
        self._id = id
        self.callback = cb

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if any(isinstance(frame, frame_type) for frame_type in self.frame_types):
            logger.debug("{} Got frame {} {}", self._id, frame, direction)
            if self.callback is not None:
                self.callback(frame, direction)

        await self.push_frame(frame, direction)

And this FrameDropper processor:

from typing import List, Type

from pipecat.frames.frames import Frame, TextFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from abc import abstractmethod
from loguru import logger

class FrameDropper(FrameProcessor):
    """
    Will drop text frames for any rule that matches
    """

    def __init__(self, rules: List[Type[Frame]]):
        logger.debug(f"initializing FrameDropper with rules {rules}")
        super().__init__()
        self.rules = rules

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if any(isinstance(frame, rule) for rule in self.rules):
            # we matched, drop it
            logger.debug(f"dropping frame {frame} due to match")
            return

        await self.push_frame(frame, direction)

What I set up is I drop all frames but the TranscriptionFrame so I fully control when interrupts are sent, the pipeline has these components:

transport.input(),
FrameDropper([UserStartedSpeakingFrame, StartInterruptionFrame,
                              EmulateUserStartedSpeakingFrame]),
FrameDropper([UserStoppedSpeakingFrame,
              StopInterruptionFrame, EmulateUserStoppedSpeakingFrame]),
stt,
FrameWrapper(
                    [StartInterruptionFrame, UserStartedSpeakingFrame, EmulateUserStartedSpeakingFrame],
                    [TranscriptionFrame],
                    [StopInterruptionFrame, UserStoppedSpeakingFrame, EmulateUserStoppedSpeakingFrame],
                ),
SimpleFrameLogger("wrapped", [StartInterruptionFrame, UserStartedSpeakingFrame,
                                               StopInterruptionFrame, UserStoppedSpeakingFrame,
                                               InterimTranscriptionFrame, TranscriptionFrame, EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame]),

note you could probably just disable the VAD, but this way is a direct modification of the examples I've been encouraged to test against

When setting all the delays in the FrameWrapper to 0, i see this behavior:

Image

As you can see, the FrameWrapper emits logs in the order we expect:

2025-03-01 10:09:02.365 | DEBUG    | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.StartInterruptionFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.366 | DEBUG    | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.UserStartedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.366 | DEBUG    | frame_wrapper:process_frame:26 - pushing pre frame <class 'pipecat.frames.frames.EmulateUserStartedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG    | frame_wrapper:process_frame:30 - pushing original frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG    | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.StopInterruptionFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.367 | DEBUG    | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.UserStoppedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)
2025-03-01 10:09:02.368 | DEBUG    | frame_wrapper:process_frame:34 - pushing post frame <class 'pipecat.frames.frames.EmulateUserStoppedSpeakingFrame'> for target frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00)

However in the SimpleFramerLogger processor, the processor immediately after the frame processor that generates those frames moves the transcription frame to after the stop frames:

2025-03-01 10:09:02.366 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame StartInterruptionFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.366 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame UserStartedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame EmulateUserStartedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame StopInterruptionFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.367 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame UserStoppedSpeakingFrame#0 FrameDirection.DOWNSTREAM
2025-03-01 10:09:02.368 | DEBUG    | simple_frame_logger:process_frame:23 - wrapped Got frame TranscriptionFrame#0(user: , text: [Hello?], language: None, timestamp: 2025-03-01T18:09:02.365+00:00) FrameDirection.DOWNSTREAM

It didn't move it just by one, it moved it by two positions.

If I set the target_frame_delay to even 0.01 (10 milliseconds), the ordering is fixed.

This happens 100% of the time on my laptop testing. This causes interrupts to completely fail in a cascading way that creates an unimaginably bad experience as the bot repeats itself over and over again if the user generated more than a single TranscriptionFrame. I've seen this generate upwards of 4 repeats of a long response in production.

TBH I'm flabbergasted that nobody has encountered this earlier, considering we were hitting this scenario about 10% of the time in our simulation testing. I cannot imagine that we have the more sophisticated implementation of Pipecat, nor the largest scale, but maybe nobody's inspecting/reviewing calls and simulation testing like we are.

We've had to completely disable the VAD and use our own managed interruptions around STT frames and stop speaking timeouts because of this to ensure correct ordering. Since managing our own start/stop speaking frames with small delays between to ensure ordering, we've not seen the repeating behavior since.

I think it would be easy to set up automated testing for this as well, since I was able to get it to run entirely deterministically (at least across 20+ runs). Maybe this is an artifact of the async runtime, I haven't looked into how Python's works, but the reordering seems a bit extreme (2 places, not just 1) to be just a runtime issue.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions