Skip to content

Commit 28bceb3

Browse files
chemicLtzolov
authored andcommitted
Prevent blocking event loop calling tools
When streaming happens on an event loop, due to the fact that currently tools can only be synchronous and blocking, every tool call needs to be offloaded to a blocking-safe Scheduler. Resolves #2341 Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent 5a4e9f5 commit 28bceb3

File tree

10 files changed

+146
-97
lines changed

10 files changed

+146
-97
lines changed

models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.lang.Nullable;
4040
import reactor.core.publisher.Flux;
4141
import reactor.core.publisher.Mono;
42+
import reactor.core.scheduler.Schedulers;
4243

4344
import org.springframework.ai.anthropic.api.AnthropicApi;
4445
import org.springframework.ai.anthropic.api.AnthropicApi.AnthropicMessage;
@@ -351,17 +352,21 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
351352
ChatResponse chatResponse = toChatResponse(chatCompletionResponse, accumulatedUsage);
352353

353354
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions()) && chatResponse.hasToolCalls() && chatResponse.hasFinishReasons(Set.of("tool_use"))) {
354-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
355-
if (toolExecutionResult.returnDirect()) {
356-
// Return tool execution result directly to the client.
357-
return Flux.just(ChatResponse.builder().from(chatResponse)
358-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
359-
.build());
360-
} else {
361-
// Send the tool execution result back to the model.
362-
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
363-
chatResponse);
364-
}
355+
// FIXME: bounded elastic needs to be used since tool calling
356+
// is currently only synchronous
357+
return Flux.defer(() -> {
358+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
359+
if (toolExecutionResult.returnDirect()) {
360+
// Return tool execution result directly to the client.
361+
return Flux.just(ChatResponse.builder().from(chatResponse)
362+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
363+
.build());
364+
} else {
365+
// Send the tool execution result back to the model.
366+
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
367+
chatResponse);
368+
}
369+
}).subscribeOn(Schedulers.boundedElastic());
365370
}
366371

367372
return Mono.just(chatResponse);

models/spring-ai-azure-openai/src/main/java/org/springframework/ai/azure/openai/AzureOpenAiChatModel.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
6161
import reactor.core.publisher.Flux;
62+
import reactor.core.scheduler.Schedulers;
6263

6364
import org.springframework.ai.chat.messages.AssistantMessage;
6465
import org.springframework.ai.chat.messages.Message;
@@ -404,20 +405,24 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
404405
return chatResponseFlux.flatMap(chatResponse -> {
405406
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions())
406407
&& chatResponse.hasToolCalls()) {
407-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
408-
if (toolExecutionResult.returnDirect()) {
409-
// Return tool execution result directly to the client.
410-
return Flux.just(ChatResponse.builder()
411-
.from(chatResponse)
412-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
413-
.build());
414-
}
415-
else {
416-
// Send the tool execution result back to the model.
417-
return this.internalStream(
418-
new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
419-
chatResponse);
420-
}
408+
// FIXME: bounded elastic needs to be used since tool calling
409+
// is currently only synchronous
410+
return Flux.defer(() -> {
411+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
412+
if (toolExecutionResult.returnDirect()) {
413+
// Return tool execution result directly to the client.
414+
return Flux.just(ChatResponse.builder()
415+
.from(chatResponse)
416+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
417+
.build());
418+
}
419+
else {
420+
// Send the tool execution result back to the model.
421+
return this.internalStream(
422+
new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
423+
chatResponse);
424+
}
425+
}).subscribeOn(Schedulers.boundedElastic());
421426
}
422427

423428
Flux<ChatResponse> flux = Flux.just(chatResponse)

models/spring-ai-bedrock-converse/src/main/java/org/springframework/ai/bedrock/converse/BedrockProxyChatModel.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Sinks;
3838
import reactor.core.publisher.Sinks.EmitFailureHandler;
39+
import reactor.core.scheduler.Schedulers;
3940
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
4041
import software.amazon.awssdk.core.SdkBytes;
4142
import software.amazon.awssdk.core.document.Document;
@@ -703,21 +704,25 @@ private Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse perviousCh
703704
&& chatResponse.hasToolCalls()
704705
&& chatResponse.hasFinishReasons(Set.of(StopReason.TOOL_USE.toString()))) {
705706

706-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
707-
708-
if (toolExecutionResult.returnDirect()) {
709-
// Return tool execution result directly to the client.
710-
return Flux.just(ChatResponse.builder()
711-
.from(chatResponse)
712-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
713-
.build());
714-
}
715-
else {
716-
// Send the tool execution result back to the model.
717-
return this.internalStream(
718-
new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
719-
chatResponse);
720-
}
707+
// FIXME: bounded elastic needs to be used since tool calling
708+
// is currently only synchronous
709+
return Flux.defer(() -> {
710+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse);
711+
712+
if (toolExecutionResult.returnDirect()) {
713+
// Return tool execution result directly to the client.
714+
return Flux.just(ChatResponse.builder()
715+
.from(chatResponse)
716+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
717+
.build());
718+
}
719+
else {
720+
// Send the tool execution result back to the model.
721+
return this.internalStream(
722+
new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
723+
chatResponse);
724+
}
725+
}).subscribeOn(Schedulers.boundedElastic());
721726
}
722727
else {
723728
return Flux.just(chatResponse);

models/spring-ai-minimax/src/main/java/org/springframework/ai/minimax/MiniMaxChatModel.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131
import reactor.core.publisher.Flux;
3232
import reactor.core.publisher.Mono;
33+
import reactor.core.scheduler.Schedulers;
3334

3435
import org.springframework.ai.chat.messages.AssistantMessage;
3536
import org.springframework.ai.chat.messages.MessageType;
@@ -337,10 +338,14 @@ public Flux<ChatResponse> stream(Prompt prompt) {
337338
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
338339
if (!isProxyToolCalls(prompt, this.defaultOptions) && isToolCall(response,
339340
Set.of(ChatCompletionFinishReason.TOOL_CALLS.name(), ChatCompletionFinishReason.STOP.name()))) {
340-
var toolCallConversation = handleToolCalls(prompt, response);
341-
// Recursively call the stream method with the tool call message
342-
// conversation that contains the call responses.
343-
return this.stream(new Prompt(toolCallConversation, prompt.getOptions()));
341+
// FIXME: bounded elastic needs to be used since tool calling
342+
// is currently only synchronous
343+
return Flux.defer(() -> {
344+
var toolCallConversation = handleToolCalls(prompt, response);
345+
// Recursively call the stream method with the tool call message
346+
// conversation that contains the call responses.
347+
return this.stream(new Prompt(toolCallConversation, prompt.getOptions()));
348+
}).subscribeOn(Schedulers.boundedElastic());
344349
}
345350
return Flux.just(response);
346351
})

models/spring-ai-mistral-ai/src/main/java/org/springframework/ai/mistralai/MistralAiChatModel.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.lang.Nullable;
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Mono;
38+
import reactor.core.scheduler.Schedulers;
3839

3940
import org.springframework.ai.chat.messages.AssistantMessage;
4041
import org.springframework.ai.chat.messages.SystemMessage;
@@ -369,17 +370,21 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
369370
// @formatter:off
370371
Flux<ChatResponse> chatResponseFlux = chatResponse.flatMap(response -> {
371372
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions()) && response.hasToolCalls()) {
372-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
373-
if (toolExecutionResult.returnDirect()) {
374-
// Return tool execution result directly to the client.
375-
return Flux.just(ChatResponse.builder().from(response)
376-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
377-
.build());
378-
} else {
379-
// Send the tool execution result back to the model.
380-
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
381-
response);
382-
}
373+
// FIXME: bounded elastic needs to be used since tool calling
374+
// is currently only synchronous
375+
return Flux.defer(() -> {
376+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
377+
if (toolExecutionResult.returnDirect()) {
378+
// Return tool execution result directly to the client.
379+
return Flux.just(ChatResponse.builder().from(response)
380+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
381+
.build());
382+
} else {
383+
// Send the tool execution result back to the model.
384+
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
385+
response);
386+
}
387+
}).subscribeOn(Schedulers.boundedElastic());
383388
}
384389
else {
385390
return Flux.just(response);

models/spring-ai-moonshot/src/main/java/org/springframework/ai/moonshot/MoonshotChatModel.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030
import reactor.core.publisher.Flux;
3131
import reactor.core.publisher.Mono;
32+
import reactor.core.scheduler.Schedulers;
3233

3334
import org.springframework.ai.chat.messages.AssistantMessage;
3435
import org.springframework.ai.chat.messages.MessageType;
@@ -321,10 +322,14 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
321322
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
322323
if (!isProxyToolCalls(prompt, this.defaultOptions) && isToolCall(response,
323324
Set.of(ChatCompletionFinishReason.TOOL_CALLS.name(), ChatCompletionFinishReason.STOP.name()))) {
324-
var toolCallConversation = handleToolCalls(prompt, response);
325-
// Recursively call the stream method with the tool call message
326-
// conversation that contains the call responses.
327-
return this.internalStream(new Prompt(toolCallConversation, prompt.getOptions()), response);
325+
// FIXME: bounded elastic needs to be used since tool calling
326+
// is currently only synchronous
327+
return Flux.defer(() -> {
328+
var toolCallConversation = handleToolCalls(prompt, response);
329+
// Recursively call the stream method with the tool call message
330+
// conversation that contains the call responses.
331+
return this.internalStream(new Prompt(toolCallConversation, prompt.getOptions()), response);
332+
}).subscribeOn(Schedulers.boundedElastic());
328333
}
329334
return Flux.just(response);
330335
})

models/spring-ai-ollama/src/main/java/org/springframework/ai/ollama/OllamaChatModel.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.ai.util.json.JsonParser;
3737
import org.springframework.lang.Nullable;
3838
import reactor.core.publisher.Flux;
39+
import reactor.core.scheduler.Schedulers;
3940

4041
import org.springframework.ai.chat.messages.AssistantMessage;
4142
import org.springframework.ai.chat.messages.SystemMessage;
@@ -341,17 +342,21 @@ private Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCh
341342
// @formatter:off
342343
Flux<ChatResponse> chatResponseFlux = chatResponse.flatMap(response -> {
343344
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions()) && response.hasToolCalls()) {
344-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
345-
if (toolExecutionResult.returnDirect()) {
346-
// Return tool execution result directly to the client.
347-
return Flux.just(ChatResponse.builder().from(response)
348-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
349-
.build());
350-
} else {
351-
// Send the tool execution result back to the model.
352-
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
353-
response);
354-
}
345+
// FIXME: bounded elastic needs to be used since tool calling
346+
// is currently only synchronous
347+
return Flux.defer(() -> {
348+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
349+
if (toolExecutionResult.returnDirect()) {
350+
// Return tool execution result directly to the client.
351+
return Flux.just(ChatResponse.builder().from(response)
352+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
353+
.build());
354+
} else {
355+
// Send the tool execution result back to the model.
356+
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
357+
response);
358+
}
359+
}).subscribeOn(Schedulers.boundedElastic());
355360
}
356361
else {
357362
return Flux.just(response);

models/spring-ai-openai/src/main/java/org/springframework/ai/openai/OpenAiChatModel.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.lang.Nullable;
3838
import reactor.core.publisher.Flux;
3939
import reactor.core.publisher.Mono;
40+
import reactor.core.scheduler.Schedulers;
4041

4142
import org.springframework.ai.chat.messages.AssistantMessage;
4243
import org.springframework.ai.chat.messages.MessageType;
@@ -437,19 +438,22 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
437438

438439
// @formatter:off
439440
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
440-
441441
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions()) && response.hasToolCalls()) {
442-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
443-
if (toolExecutionResult.returnDirect()) {
444-
// Return tool execution result directly to the client.
445-
return Flux.just(ChatResponse.builder().from(response)
446-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
447-
.build());
448-
} else {
449-
// Send the tool execution result back to the model.
450-
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
451-
response);
452-
}
442+
return Flux.defer(() -> {
443+
// FIXME: bounded elastic needs to be used since tool calling
444+
// is currently only synchronous
445+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
446+
if (toolExecutionResult.returnDirect()) {
447+
// Return tool execution result directly to the client.
448+
return Flux.just(ChatResponse.builder().from(response)
449+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
450+
.build());
451+
} else {
452+
// Send the tool execution result back to the model.
453+
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
454+
response);
455+
}
456+
}).subscribeOn(Schedulers.boundedElastic());
453457
}
454458
else {
455459
return Flux.just(response);

models/spring-ai-vertex-ai-gemini/src/main/java/org/springframework/ai/vertexai/gemini/VertexAiGeminiChatModel.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.slf4j.LoggerFactory;
5050
import reactor.core.publisher.Flux;
5151
import reactor.core.publisher.Mono;
52+
import reactor.core.scheduler.Schedulers;
5253

5354
import org.springframework.ai.chat.messages.AssistantMessage;
5455
import org.springframework.ai.chat.messages.Message;
@@ -545,18 +546,22 @@ public Flux<ChatResponse> internalStream(Prompt prompt) {
545546
}));
546547

547548
// @formatter:off
548-
Flux<ChatResponse> chatResponseFlux = chatResponse1.flatMap(response -> {
549+
Flux<ChatResponse> chatResponseFlux = chatResponse1.flatMap(response -> {
549550
if (ToolCallingChatOptions.isInternalToolExecutionEnabled(prompt.getOptions()) && response.hasToolCalls()) {
550-
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
551-
if (toolExecutionResult.returnDirect()) {
552-
// Return tool execution result directly to the client.
553-
return Flux.just(ChatResponse.builder().from(response)
554-
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
555-
.build());
556-
} else {
557-
// Send the tool execution result back to the model.
558-
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()));
559-
}
551+
// FIXME: bounded elastic needs to be used since tool calling
552+
// is currently only synchronous
553+
return Flux.defer(() -> {
554+
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
555+
if (toolExecutionResult.returnDirect()) {
556+
// Return tool execution result directly to the client.
557+
return Flux.just(ChatResponse.builder().from(response)
558+
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
559+
.build());
560+
} else {
561+
// Send the tool execution result back to the model.
562+
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()));
563+
}
564+
}).subscribeOn(Schedulers.boundedElastic());
560565
}
561566
else {
562567
return Flux.just(response);

models/spring-ai-zhipuai/src/main/java/org/springframework/ai/zhipuai/ZhiPuAiChatModel.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.slf4j.LoggerFactory;
3232
import reactor.core.publisher.Flux;
3333
import reactor.core.publisher.Mono;
34+
import reactor.core.scheduler.Schedulers;
3435

3536
import org.springframework.ai.chat.messages.AssistantMessage;
3637
import org.springframework.ai.chat.messages.MessageType;
@@ -306,10 +307,14 @@ public Flux<ChatResponse> stream(Prompt prompt) {
306307
// @formatter:off
307308
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
308309
if (!isProxyToolCalls(prompt, this.defaultOptions) && isToolCall(response, Set.of(ChatCompletionFinishReason.TOOL_CALLS.name(), ChatCompletionFinishReason.STOP.name()))) {
309-
var toolCallConversation = handleToolCalls(prompt, response);
310-
// Recursively call the stream method with the tool call message
311-
// conversation that contains the call responses.
312-
return this.stream(new Prompt(toolCallConversation, prompt.getOptions()));
310+
// FIXME: bounded elastic needs to be used since tool calling
311+
// is currently only synchronous
312+
return Flux.defer(() -> {
313+
var toolCallConversation = handleToolCalls(prompt, response);
314+
// Recursively call the stream method with the tool call message
315+
// conversation that contains the call responses.
316+
return this.stream(new Prompt(toolCallConversation, prompt.getOptions()));
317+
}).subscribeOn(Schedulers.boundedElastic());
313318
}
314319
return Flux.just(response);
315320
})

0 commit comments

Comments
 (0)