Skip to content

ClassCastException: class AnthropicApi$MessageDeltaEvent cannot be cast to class AnthropicApi$ToolUseAggregationEvent #2200

Open
@mmazurkevich

Description

@mmazurkevich

Bug description
If the response from the claude-3-5-sonnet-20241022 model is more then max-tokens limit the code fails with the strange exception

java.lang.ClassCastException: class org.springframework.ai.anthropic.api.AnthropicApi$MessageDeltaEvent cannot be cast to class org.springframework.ai.anthropic.api.AnthropicApi$ToolUseAggregationEvent (org.springframework.ai.anthropic.api.AnthropicApi$MessageDeltaEvent and org.springframework.ai.anthropic.api.AnthropicApi$ToolUseAggregationEvent are in unnamed module of loader 'app')
	at org.springframework.ai.anthropic.api.StreamHelper.mergeToolUseEvents(StreamHelper.java:73) ~[spring-ai-anthropic-1.0.0-M5.jar:1.0.0-M5]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoReduceSeed] :
	reactor.core.publisher.Flux.reduce(Flux.java:7737)
	org.springframework.ai.anthropic.api.AnthropicApi.lambda$chatCompletionStream$7(AnthropicApi.java:212)
Error has been observed at the following site(s):
	*___________Flux.reduce ⇢ at org.springframework.ai.anthropic.api.AnthropicApi.lambda$chatCompletionStream$7(AnthropicApi.java:212)
	*__________Flux.flatMap ⇢ at org.springframework.ai.anthropic.api.AnthropicApi.chatCompletionStream(AnthropicApi.java:216)
	|_             Flux.map ⇢ at org.springframework.ai.anthropic.api.AnthropicApi.chatCompletionStream(AnthropicApi.java:217)
	|_          Flux.filter ⇢ at org.springframework.ai.anthropic.api.AnthropicApi.chatCompletionStream(AnthropicApi.java:218)
	|_       Flux.switchMap ⇢ at org.springframework.ai.anthropic.AnthropicChatModel.lambda$internalStream$7(AnthropicChatModel.java:283)
	|_       Flux.doOnError ⇢ at org.springframework.ai.anthropic.AnthropicChatModel.lambda$internalStream$7(AnthropicChatModel.java:296)
	|_       Flux.doFinally ⇢ at org.springframework.ai.anthropic.AnthropicChatModel.lambda$internalStream$7(AnthropicChatModel.java:297)
	|_    Flux.contextWrite ⇢ at org.springframework.ai.anthropic.AnthropicChatModel.lambda$internalStream$7(AnthropicChatModel.java:298)
	|_   Flux.doOnSubscribe ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:94)
	|_        Flux.doOnNext ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:105)
	|_    Flux.doOnComplete ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:144)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:171)
	*__Flux.deferContextual ⇢ at org.springframework.ai.anthropic.AnthropicChatModel.internalStream(AnthropicChatModel.java:265)
	|_             Flux.map ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultChatClientRequestSpec$2.aroundStream(DefaultChatClient.java:694)
	|_       Flux.publishOn ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultChatClientRequestSpec$2.aroundStream(DefaultChatClient.java:695)
	*____________Flux.defer ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:124)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:125)
	|_       Flux.doFinally ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:126)
	|_    Flux.contextWrite ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:127)
	*__Flux.deferContextual ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.nextAroundStream(DefaultAroundAdvisorChain.java:103)
	|_             Flux.map ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregateAdvisedResponse(MessageAggregator.java:56)
	|_   Flux.doOnSubscribe ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:94)
	|_        Flux.doOnNext ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:105)
	|_    Flux.doOnComplete ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:144)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:171)
	|_             Flux.map ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregateAdvisedResponse(MessageAggregator.java:69)
	*____________Flux.defer ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:124)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:125)
	|_       Flux.doFinally ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:126)
	|_    Flux.contextWrite ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:127)
	*__Flux.deferContextual ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.nextAroundStream(DefaultAroundAdvisorChain.java:103)
	*______Mono.flatMapMany ⇢ at org.springframework.ai.chat.client.advisor.AbstractChatMemoryAdvisor.doNextWithProtectFromBlockingBefore(AbstractChatMemoryAdvisor.java:194)
	|_             Flux.map ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregateAdvisedResponse(MessageAggregator.java:56)
	|_   Flux.doOnSubscribe ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:94)
	|_        Flux.doOnNext ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:105)
	|_    Flux.doOnComplete ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:144)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregate(MessageAggregator.java:171)
	|_             Flux.map ⇢ at org.springframework.ai.chat.model.MessageAggregator.aggregateAdvisedResponse(MessageAggregator.java:69)
	*____________Flux.defer ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:124)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:125)
	|_       Flux.doFinally ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:126)
	|_    Flux.contextWrite ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextAroundStream$6(DefaultAroundAdvisorChain.java:127)
	*__Flux.deferContextual ⇢ at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.nextAroundStream(DefaultAroundAdvisorChain.java:103)
	|_             Flux.map ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:548)
	|_       Flux.doOnError ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:549)
	|_       Flux.doFinally ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:550)
	|_    Flux.contextWrite ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:551)
	*__Flux.deferContextual ⇢ at org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.doGetObservableFluxChatResponse(DefaultChatClient.java:527)
	|_      Flux.mapNotNull ⇢ at com.roux.panda.controller.AgentController.generationStream(AgentController.kt:41)
	|_             Flux.map ⇢ at com.roux.panda.controller.AgentController.generationStream(AgentController.kt:41)
	|_    Flux.contextWrite ⇢ at org.springframework.web.servlet.mvc.method.annotation.ReactiveTypeHandler$ContextSnapshotHelper.writeReactorContext(ReactiveTypeHandler.java:528)
Original Stack Trace:
		at org.springframework.ai.anthropic.api.StreamHelper.mergeToolUseEvents(StreamHelper.java:73) ~[spring-ai-anthropic-1.0.0-M5.jar:1.0.0-M5]
		at reactor.core.publisher.MonoReduceSeed$ReduceSeedSubscriber.onNext(MonoReduceSeed.java:116) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:670) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:790) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:268) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onNext(FluxTakeUntil.java:95) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:251) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:259) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:865) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:480) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:202) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:317) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:227) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:200) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:180) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:571) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:310) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256) ~[reactor-core-3.6.13.jar:3.6.13]
		at reactor.adapter.JdkFlowAdapter$SubscriberToRS.onNext(JdkFlowAdapter.java:150) ~[reactor-core-3.6.13.jar:3.6.13]
		at java.net.http/jdk.internal.net.http.ResponseSubscribers$PublishingBodySubscriber.onNext(ResponseSubscribers.java:1028) ~[java.net.http:na]
		at java.net.http/jdk.internal.net.http.ResponseSubscribers$PublishingBodySubscriber.onNext(ResponseSubscribers.java:868) ~[java.net.http:na]
		at java.net.http/jdk.internal.net.http.common.HttpBodySubscriberWrapper.onNext(HttpBodySubscriberWrapper.java:391) ~[java.net.http:na]
		at java.net.http/jdk.internal.net.http.common.HttpBodySubscriberWrapper.onNext(HttpBodySubscriberWrapper.java:49) ~[java.net.http:na]

It took a while to understand that the real problem is in spring.ai.anthropic.chat.options.max-tokens. With the help of debugging org.springframework.ai.anthropic.api.AnthropicApi.chatCompletionStream jsonToObject method I just got this JSON

{
  "type": "message_delta",
  "delta": {
    "stop_reason": "max_tokens",
    "stop_sequence": null
  },
  "usage": {
    "output_tokens": 500
  }
}

Environment

  • spring-ai-bom:1.0.0-M5
  • spring-ai-anthropic-1.0.0-M5

Steps to reproduce
Ask the model claude-3-5-sonnet-20241022 to create some big file that has size more than the default spring.ai.anthropic.chat.options.max-tokens limit (500 tokens)

Expected behavior
Exception with the reason why the execution is not fully complete to avoid debugging

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions