Skip to content

Deserialisation of AssistantStreamEventType.THREAD_MESSAGE_DELTA fails #421

Open
@miguel-uvicuo

Description

@miguel-uvicuo

Description

Deserialisation of AssistantStreamEventType.THREAD_MESSAGE_DELTA fails.

Reason why is that the content of the message delta object sent by openaiI is an array, as specified in the openai API spec

https://platform.openai.com/docs/api-reference/assistants-streaming/message-delta-object

Image

However, in the class MessageDeltaData it is not an array

@BetaOpenAI
@Serializable
public data class MessageDeltaData(
@SerialName("role") val role: Role,
@SerialName("content") val content: MessageContent
)

Given the above, it is normal that when you try to deserialise the json object sent by openAPI the library throws the following error:

Expected class kotlinx.serialization.json.JsonObject as the serialized body of com.aallam.openai.api.message.MessageContent, but had class kotlinx.serialization.json.JsonArray"

Full exception message:

kotlinx.serialization.json.internal.JsonDecodingException: Expected class kotlinx.serialization.json.JsonObject as the serialized body of com.aallam.openai.api.message.MessageContent, but had class kotlinx.serialization.json.JsonArray
	at kotlinx.serialization.json.internal.JsonExceptionsKt.JsonDecodingException(JsonExceptions.kt:24)
	at kotlinx.serialization.json.internal.PolymorphicKt.decodeSerializableValuePolymorphic(Polymorphic.kt:106)
	at kotlinx.serialization.json.internal.StreamingJsonDecoder.decodeSerializableValue(StreamingJsonDecoder.kt:75)
	at kotlinx.serialization.encoding.AbstractDecoder.decodeSerializableValue(AbstractDecoder.kt:43)
	at kotlinx.serialization.encoding.AbstractDecoder.decodeSerializableElement(AbstractDecoder.kt:70)
	at kotlinx.serialization.json.internal.StreamingJsonDecoder.decodeSerializableElement(StreamingJsonDecoder.kt:168)
	at com.aallam.openai.api.run.MessageDeltaData$$serializer.deserialize(AssistantStreamEvent.kt:77)
	at com.aallam.openai.api.run.MessageDeltaData$$serializer.deserialize(AssistantStreamEvent.kt:77)
	at kotlinx.serialization.json.internal.StreamingJsonDecoder.decodeSerializableValue(StreamingJsonDecoder.kt:69)
	at kotlinx.serialization.encoding.AbstractDecoder.decodeSerializableValue(AbstractDecoder.kt:43)
	at kotlinx.serialization.encoding.AbstractDecoder.decodeSerializableElement(AbstractDecoder.kt:70)
	at kotlinx.serialization.json.internal.StreamingJsonDecoder.decodeSerializableElement(StreamingJsonDecoder.kt:168)
	at com.aallam.openai.api.run.MessageDelta$$serializer.deserialize(AssistantStreamEvent.kt:64)
	at com.aallam.openai.api.run.MessageDelta$$serializer.deserialize(AssistantStreamEvent.kt:64)
	at kotlinx.serialization.json.internal.StreamingJsonDecoder.decodeSerializableValue(StreamingJsonDecoder.kt:69)
	at kotlinx.serialization.json.Json.decodeFromString(Json.kt:107)
	at com.aallam.openai.client.extension.AssistantStreamEventKt.getData(AssistantStreamEvent.kt:29)
	at com.aallam.openai.client.extension.AssistantStreamEventKt.getData(AssistantStreamEvent.kt:17)
	at com.uvicuo.fleetManager.adapters.openai.OpenAIAssistantService$sendRealTimeMessage$2$1.emit(OpenAIAssistantService.kt:75)
	at com.uvicuo.fleetManager.adapters.openai.OpenAIAssistantService$sendRealTimeMessage$2$1.emit(OpenAIAssistantService.kt:62)
	at com.aallam.openai.client.internal.http.HttpTransport$performSse$$inlined$map$1$2.emit(Emitters.kt:50)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:33)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.access$emitAllImpl$FlowKt__ChannelsKt(Channels.kt:1)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Channels.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:65)
	at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:241)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:159)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:466)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:500)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:489)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:364)
	at io.ktor.utils.io.ByteChannel$Slot$Task$DefaultImpls.resume(ByteChannel.kt:227)
	at io.ktor.utils.io.ByteChannel$Slot$Read.resume(ByteChannel.kt:233)
	at io.ktor.utils.io.ByteChannel.flushWriteBuffer(ByteChannel.kt:350)
	at io.ktor.utils.io.ByteChannel.flush(ByteChannel.kt:90)
	at io.ktor.utils.io.ByteReadChannelOperationsKt.copyTo(ByteReadChannelOperations.kt:174)
	at io.ktor.http.cio.ChunkedTransferEncodingKt.decodeChunked(ChunkedTransferEncoding.kt:74)
	at io.ktor.http.cio.ChunkedTransferEncodingKt$decodeChunked$2.invokeSuspend(ChunkedTransferEncoding.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:111)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:99)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:811)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:715)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:702)

Steps to Reproduce

Try to parse the data. Example code:

    suspend fun sendRealTimeMessage(
        threadId: String,
        message: String,
    ): Flow<String> =
        flow {

            val openAIMessage = openAI.message(ThreadId(threadId), MessageRequest(Role.User, message))
            openAI
                .createStreamingRun(openAIMessage.threadId, RunRequest(AssistantId(assistantId)))
                .collect { assistantStreamEvent ->
                    when (assistantStreamEvent.type) {

                        AssistantStreamEventType.THREAD_MESSAGE_DELTA -> {
                            // THROWS AN EXCEPTION
                            val messageDelta = assistantStreamEvent.getData<MessageDelta>()
                        }
                        else -> {
                        }
                    }
                }
        }

Environment

  • openai-kotlin version: 4.0.1
  • Kotlin version: 2.1.0
  • OS: macOS

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions