Description
Duplicate from Azure SDK as I'm not sure where the issue lies exactly as while the exception is triggered in the Azure Open AI SDK, using the SDK directly does not cause this issue.
Azure/azure-sdk-for-java #40898
Bug description
In a Spring Boot WebFlux project, calling the stream method of Spring AI which then calls the Azure OpenAI SDK, it throws the exception
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
Exception or Stack Trace
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
Suppressed: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.7.jar:3.6.7]
at com.azure.core.http.netty.NettyAsyncHttpClient.sendSync(NettyAsyncHttpClient.java:198) ~[azure-core-http-netty-1.15.0.jar:1.15.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.HttpLoggingPolicy.processSync(HttpLoggingPolicy.java:175) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.implementation.http.policy.InstrumentationPolicy.processSync(InstrumentationPolicy.java:101) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.KeyCredentialPolicy.processSync(KeyCredentialPolicy.java:115) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.CookiePolicy.processSync(CookiePolicy.java:73) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.AddDatePolicy.processSync(AddDatePolicy.java:50) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.RetryPolicy.attemptSync(RetryPolicy.java:211) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.RetryPolicy.processSync(RetryPolicy.java:161) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.AddHeadersFromContextPolicy.processSync(AddHeadersFromContextPolicy.java:67) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.RequestIdPolicy.processSync(RequestIdPolicy.java:77) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.HttpPipelineSyncPolicy.processSync(HttpPipelineSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.policy.UserAgentPolicy.processSync(UserAgentPolicy.java:174) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:138) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.implementation.http.rest.SyncRestProxy.send(SyncRestProxy.java:62) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.implementation.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:83) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.implementation.http.rest.RestProxyBase.invoke(RestProxyBase.java:125) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.http.rest.RestProxy.invoke(RestProxy.java:97) ~[azure-core-1.49.0.jar:1.49.0]
at jdk.proxy2/jdk.proxy2.$Proxy55.getChatCompletionsSync(Unknown Source) ~[na:na]
at com.azure.ai.openai.implementation.OpenAIClientImpl.getChatCompletionsWithResponse(OpenAIClientImpl.java:1444) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
at com.azure.ai.openai.OpenAIClient.getChatCompletionsWithResponse(OpenAIClient.java:318) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
at com.azure.ai.openai.OpenAIClient.getChatCompletionsStream(OpenAIClient.java:732) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
at org.springframework.ai.azure.openai.AzureOpenAiChatModel.stream(AzureOpenAiChatModel.java:165) ~[spring-ai-azure-openai-1.0.0-M1.jar:1.0.0-M1]
at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.doGetFluxChatResponse(ChatClient.java:692) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.chatResponse(ChatClient.java:707) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
at com.example.aiplatform.conversations.api.resource.ChatResource.stream(ChatResource.java:30) ~[main/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198) ~[spring-webflux-6.1.10.jar:6.1.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.20.jar:1.1.20]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.20.jar:1.1.20]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:818) ~[reactor-netty-http-1.1.20.jar:1.1.20]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.20.jar:1.1.20]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:305) ~[reactor-netty-http-1.1.20.jar:1.1.20]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
To Reproduce
- Create a RestController
- Create a ChatClient bean in a class annotated with
@Configuration
and@EnableWebFlux
- Configure application.yml for Azure OpenAI
- Call your API endpoint
Environment
Spring AI version 1.0.0-M1
Java version 21
Spring Boot Web Flux stater 3.3.1
Expected behavior
Azure OpenAI chat streams a response
Minimal Complete Reproducible example
Create a new Spring Boot 3.3.1 project through initializer, add Spring AI and Azure Open AI dependencies and configure them in application.yaml for autoconfiguration.
spring:
application:
name: ai-platform-conversations-api
ai:
azure:
openai:
chat:
enabled: true
options:
deployment-name: gpt-35-turbo
api-key: xxxxx
endpoint: https://example.openai.azure.com/
embedding:
enabled: true
options:
deployment-name: text-embedding-ada-002
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
return builder.build();
}
@RestController
public class ChatResource {
private final ChatClient chatClient;
public ChatResource(ChatClient chatClient) {
this.chatClient = chatClient;
}
@PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> stream(@RequestBody String input) {
return chatClient.prompt().user(input).stream().chatResponse();
}
}
POST http://localhost:8080/stream
Content-Type: text/plain
Recite the first 6 paragraphs of war and peace