Skip to content

fix(stream): use subscribeOn instead of publishOn for blocking stream sources#410

Merged
AlbumenJ merged 3 commits intoagentscope-ai:mainfrom
AlbumenJ:1231-stream
Dec 31, 2025
Merged

fix(stream): use subscribeOn instead of publishOn for blocking stream sources#410
AlbumenJ merged 3 commits intoagentscope-ai:mainfrom
AlbumenJ:1231-stream

Conversation

@AlbumenJ
Copy link
Collaborator

No description provided.

… sources

Change-Id: I447edd7c8e4efff4efa6fabc4bb89eb6583fc80a
… sources

Change-Id: Ia64cb92acf4330b46084657d34080eae144ed2c5
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes threading issues in streaming operations by replacing publishOn with subscribeOn when working with blocking stream sources. The change ensures that blocking I/O operations and synchronous data source iteration happen on appropriate thread pools rather than blocking reactive pipelines.

  • Corrects scheduler usage for Flux sources that perform blocking operations (HTTP streaming, Java Stream/Iterable conversion)
  • Applies the fix across multiple model implementations and HTTP transport layers
  • Updates 6 files with consistent scheduler pattern changes

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java Changes streaming HTTP SSE handling to use subscribeOn for blocking BufferedReader operations
agentscope-examples/model-request-compression/src/main/java/io/agentscope/examples/compression/extra/CompressingOkHttpTransport.java Same streaming fix for compressed HTTP transport variant
agentscope-core/src/main/java/io/agentscope/core/model/OpenAIChatModel.java Fixes scheduler for Flux.fromStream() conversion from OpenAI SDK's blocking Stream
agentscope-core/src/main/java/io/agentscope/core/model/GeminiChatModel.java Fixes scheduler for Flux.fromIterable() conversion from Gemini's ResponseStream
agentscope-core/src/main/java/io/agentscope/core/model/AnthropicChatModel.java Fixes scheduler for Flux.fromStream() conversion from Anthropic SDK's blocking Stream
agentscope-core/src/main/java/io/agentscope/core/agent/AgentBase.java Changes event stream creation scheduler, though this source is reactive rather than blocking

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…se.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@codecov
Copy link

codecov bot commented Dec 31, 2025

Codecov Report

❌ Patch coverage is 20.00000% with 4 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...a/io/agentscope/core/model/AnthropicChatModel.java 0.00% 2 Missing ⚠️
...java/io/agentscope/core/model/GeminiChatModel.java 0.00% 1 Missing ⚠️
...java/io/agentscope/core/model/OpenAIChatModel.java 0.00% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

@AlbumenJ AlbumenJ merged commit 6486596 into agentscope-ai:main Dec 31, 2025
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant