Skip to content

[BUG] [Azure OpenAI] block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2 #981

Closed
@nspyke

Description

@nspyke

Duplicate from Azure SDK as I'm not sure where the issue lies exactly as while the exception is triggered in the Azure Open AI SDK, using the SDK directly does not cause this issue.

Azure/azure-sdk-for-java #40898

Bug description
In a Spring Boot WebFlux project, calling the stream method of Spring AI which then calls the Azure OpenAI SDK, it throws the exception
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

Exception or Stack Trace

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
	Suppressed: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.7.jar:3.6.7]
		at com.azure.core.http.netty.NettyAsyncHttpClient.sendSync(NettyAsyncHttpClient.java:198) ~[azure-core-http-netty-1.15.0.jar:1.15.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.HttpLoggingPolicy.processSync(HttpLoggingPolicy.java:175) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.policy.InstrumentationPolicy.processSync(InstrumentationPolicy.java:101) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.KeyCredentialPolicy.processSync(KeyCredentialPolicy.java:115) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.CookiePolicy.processSync(CookiePolicy.java:73) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.AddDatePolicy.processSync(AddDatePolicy.java:50) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RetryPolicy.attemptSync(RetryPolicy.java:211) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RetryPolicy.processSync(RetryPolicy.java:161) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.AddHeadersFromContextPolicy.processSync(AddHeadersFromContextPolicy.java:67) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RequestIdPolicy.processSync(RequestIdPolicy.java:77) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.HttpPipelineSyncPolicy.processSync(HttpPipelineSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.UserAgentPolicy.processSync(UserAgentPolicy.java:174) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:138) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.SyncRestProxy.send(SyncRestProxy.java:62) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:83) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.RestProxyBase.invoke(RestProxyBase.java:125) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.rest.RestProxy.invoke(RestProxy.java:97) ~[azure-core-1.49.0.jar:1.49.0]
		at jdk.proxy2/jdk.proxy2.$Proxy55.getChatCompletionsSync(Unknown Source) ~[na:na]
		at com.azure.ai.openai.implementation.OpenAIClientImpl.getChatCompletionsWithResponse(OpenAIClientImpl.java:1444) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at com.azure.ai.openai.OpenAIClient.getChatCompletionsWithResponse(OpenAIClient.java:318) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at com.azure.ai.openai.OpenAIClient.getChatCompletionsStream(OpenAIClient.java:732) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at org.springframework.ai.azure.openai.AzureOpenAiChatModel.stream(AzureOpenAiChatModel.java:165) ~[spring-ai-azure-openai-1.0.0-M1.jar:1.0.0-M1]
		at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.doGetFluxChatResponse(ChatClient.java:692) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
		at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.chatResponse(ChatClient.java:707) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
		at com.example.aiplatform.conversations.api.resource.ChatResource.stream(ChatResource.java:30) ~[main/:na]
		at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
		at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
		at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198) ~[spring-webflux-6.1.10.jar:6.1.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:818) ~[reactor-netty-http-1.1.20.jar:1.1.20]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:305) ~[reactor-netty-http-1.1.20.jar:1.1.20]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

To Reproduce

  • Create a RestController
  • Create a ChatClient bean in a class annotated with @Configuration and @EnableWebFlux
  • Configure application.yml for Azure OpenAI
  • Call your API endpoint

Environment
Spring AI version 1.0.0-M1
Java version 21
Spring Boot Web Flux stater 3.3.1

Expected behavior
Azure OpenAI chat streams a response

Minimal Complete Reproducible example
Create a new Spring Boot 3.3.1 project through initializer, add Spring AI and Azure Open AI dependencies and configure them in application.yaml for autoconfiguration.

spring:
  application:
    name: ai-platform-conversations-api
  ai:
    azure:
      openai:
        chat:
          enabled: true
          options:
            deployment-name: gpt-35-turbo
        api-key: xxxxx
        endpoint: https://example.openai.azure.com/
        embedding:
          enabled: true
          options:
            deployment-name: text-embedding-ada-002
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
        return builder.build();
}
@RestController
public class ChatResource {
    private final ChatClient chatClient;

    public ChatResource(ChatClient chatClient) {
        this.chatClient = chatClient;
    }
    @PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> stream(@RequestBody String input) {
        return chatClient.prompt().user(input).stream().chatResponse();
    }
}
POST http://localhost:8080/stream
Content-Type: text/plain

Recite the first 6 paragraphs of war and peace

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions