Skip to content

[Streaming] queue_text_chunk dumps the entire message in copilot instead of sending the streamin #297

@vishal061994-hue

Description

@vishal061994-hue

I sideloaded the application with M365 Agents SDK and I am seeing the following issues in StreamingResponse

I am currently sending a streaming response text with queue_text_chunk in python in sideloaded app in teams. The streaming is very inconsistent in both copilot and teams personal app. I see the stream emits the entire message instead of sending chunks. This behavior is very obvious in copilot compared to the teams app and users have to now wait for the entire chunk to streamed to see the response. I tried with bot framework as well and I was able to observe the same behavior. I tried the following samples:
Agent SDK Sample: https://github.com/microsoft/Agents/tree/main/samples/python/azureai-streaming
Bot Framework Sample: https://github.com/OfficeDev/Microsoft-Teams-Samples/tree/main/samples/bot-streaming/csharp
Is this an issue with sideloaded app or is this a known issue with streaming ?

Code, I am using for streaming

        async for chunk in agent_client.invoke_agent_streaming(
            content=content,
            user_ldap=user_ldap,
            user_email=user_email,
            local_execution=True,
        ):
            if not chunk:
                continue

            # Decode chunk and parse as SSE
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)

            # Parse SSE line
            event = parse_sse_line(chunk_str.strip())
            if not event:
                continue

            event_type = event.get('type', 'unknown')
            payload = event.get('payload', {})

            # Handle different event types
            if event_type == 'llm.start' or event_type == 'llm.stream.start':
                model = payload.get('model', 'unknown')
                context.streaming_response.queue_informative_update(
                    f'💭 [LLM] Generating response with {model}...\n'
                )
                llm_streaming = True

            elif event_type == 'llm.stream.chunk':
                # Extract text from delta or content
                delta = payload.get('delta')
                content_chunk = payload.get('content')
                text = delta or content_chunk

                if text:
                    # Queue the actual text content
                    context.streaming_response.queue_text_chunk(text)

            elif event_type == 'llm.stream.end' or event_type == 'llm.end':
                finish_reason = payload.get('finish_reason', 'unknown')
                usage = payload.get('usage', {})

                # Only show completion if we were streaming
                if llm_streaming:
                    context.streaming_response.queue_informative_update(
                        f'✅ [LLM] Completed (reason: {finish_reason})\n'
                    )
                    llm_streaming = False

            elif event_type.startswith('agent.'):
                context.streaming_response.queue_informative_update(
                    f'🤖 [Sub-Agent]: {payload}\n'
                )

            elif event_type.startswith('tool.'):
                tool_call_id = payload.get('tool_call_id', 'unknown')
                # Store start time for duration calculation
                tool_start_times[tool_call_id] = time.time()

                # Format and queue informative update
                message = format_tool_started_message(event)
                context.streaming_response.queue_informative_update(message)

            elif event_type == 'workflow.completed':
                context.streaming_response.queue_informative_update('✨ [Workflow Completed]\n')

            elif event_type == 'workflow.failed':
                error = payload.get('error', 'Unknown error')
                context.streaming_response.queue_informative_update(f'❌ [Workflow Failed] {error}\n')

            elif event_type == 'error':
                error_type = payload.get('error_type', event.get('error_type', 'Unknown'))
                message = payload.get('message', event.get('message', 'Unknown error'))
                context.streaming_response.queue_informative_update(f'❌ [Error] {error_type}: {message}\n')

        logger.info('Streaming response completed successfully')
    except Exception as e:
        logger.error(f'Error during streaming: {e}', exc_info=True)
        context.streaming_response.queue_informative_update(f'\n\n❌ Error: {str(e)}\n')
    finally:
        await context.streaming_response.end_stream()

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions