Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package kotlinx.rpc.grpc.client
import kotlinx.coroutines.flow.Flow
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.descriptor.MethodDescriptor

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.grpc.client

import kotlinx.rpc.grpc.GrpcCompression
import kotlin.time.Duration

/**
* The collection of runtime options for a new gRPC call.
*
* This class allows configuring per-call behavior such as timeouts.
*/
public class GrpcCallOptions {
/**
* The maximum duration to wait for the RPC to complete.
*
* If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`)
* if it does not complete within the specified duration.
* The timeout is measured from the moment the call is initiated.
* If `null`, no timeout is applied, and the call may run indefinitely.
*
* The default value is `null`.
*
* @see kotlin.time.Duration
*/
public var timeout: Duration? = null

/**
* The compression algorithm to use for encoding outgoing messages in this call.
*
* When set to a value other than [GrpcCompression.None], the client will compress request messages
* using the specified algorithm before sending them to the server. The chosen compression algorithm
* is communicated to the server via the `grpc-encoding` header.
*
* ## Default Behavior
* Defaults to [GrpcCompression.None], meaning no compression is applied to messages.
*
* ## Server Compatibility
* **Important**: It is the caller's responsibility to ensure the server supports the chosen
* compression algorithm. There is no automatic negotiation performed. If the server does not
* support the requested compression, the call will fail.
*
* ## Available Algorithms
* - [GrpcCompression.None]: No compression (identity encoding) - **default**
* - [GrpcCompression.Gzip]: GZIP compression, widely supported
*
* @see GrpcCompression
*/
public var compression: GrpcCompression = GrpcCompression.None
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.rpc.RpcCall
import kotlinx.rpc.RpcClient
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.client.internal.ManagedChannel
import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder
import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.rpc.grpc.client.internal

import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.internal.utils.InternalRpcApi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.StatusException
import kotlinx.rpc.grpc.client.ClientCallScope
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.client.GrpcClient
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.descriptor.MethodType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.grpc.client.internal
package kotlinx.rpc.grpc.client

import io.grpc.CallOptions
import kotlinx.rpc.grpc.GrpcCompression
import kotlinx.rpc.internal.utils.InternalRpcApi
import java.util.concurrent.TimeUnit

@InternalRpcApi
public fun GrpcCallOptions.toJvm(): CallOptions {
var default = CallOptions.DEFAULT
if (timeout != null) {
default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS)
default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, TimeUnit.MILLISECONDS)
}
if (compression !is GrpcCompression.None) {
default = default.withCompression(compression.name)
}
return default
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package kotlinx.rpc.grpc.client.internal

import io.grpc.Channel
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.client.toJvm
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.internal.utils.InternalRpcApi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

@file:OptIn(ExperimentalForeignApi::class)

package kotlinx.rpc.grpc.client.internal
package kotlinx.rpc.grpc.client

import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
Expand All @@ -25,4 +25,4 @@ public fun GrpcCallOptions.rawDeadline(): CValue<gpr_timespec> {
gpr_time_from_millis(it.inWholeMilliseconds, GPR_TIMESPAN)
)
} ?: gpr_inf_future(GPR_CLOCK_REALTIME)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.rpc.grpc.client.internal

import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.internal.utils.InternalRpcApi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kotlinx.coroutines.CompletableJob
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.append
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.BatchResult
import kotlinx.rpc.grpc.internal.CompletionQueue
Expand All @@ -36,6 +37,8 @@ import kotlinx.rpc.grpc.internal.toGrpcByteBuffer
import kotlinx.rpc.grpc.internal.toKotlin
import kotlinx.rpc.protobuf.input.stream.asInputStream
import kotlinx.rpc.protobuf.input.stream.asSource
import kotlinx.rpc.grpc.GrpcCompression
import kotlinx.rpc.grpc.client.GrpcCallOptions
import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA
import libkgrpc.GRPC_OP_RECV_MESSAGE
import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT
Expand Down Expand Up @@ -63,6 +66,7 @@ internal class NativeClientCall<Request, Response>(
private val cq: CompletionQueue,
internal val raw: CPointer<grpc_call>,
private val methodDescriptor: MethodDescriptor<Request, Response>,
private val callOptions: GrpcCallOptions,
private val callJob: CompletableJob,
) : ClientCall<Request, Response>() {

Expand Down Expand Up @@ -308,6 +312,16 @@ internal class NativeClientCall<Request, Response>(
val opsNum = 2uL
val ops = arena.allocArray<grpc_op>(opsNum.convert())

// add compression algorithm to the call metadata.
// the gRPC core will read the header and perform the compression (compression_filter.cc).
if (callOptions.compression !is GrpcCompression.None) {
if (callOptions.compression !is GrpcCompression.Gzip) {
// to match the behavior of grpc-java, we throw an error if the compression algorithm is not supported.
cancelInternal(grpc_status_code.GRPC_STATUS_INTERNAL, "Unable to find compressor by name ${callOptions.compression.name}")
}
headers.append("grpc-internal-encoding-request", callOptions.compression.name)
}

// turn given headers into a grpc_metadata_array.
val sendInitialMetadata: grpc_metadata_array = with(headers) {
arena.allocRawGrpcMetadata()
Expand Down Expand Up @@ -460,4 +474,3 @@ internal class NativeClientCall<Request, Response>(
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.rpc.grpc.client.ClientCredentials
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.client.rawDeadline
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.CompletionQueue
import kotlinx.rpc.grpc.internal.GrpcRuntime
Expand Down Expand Up @@ -163,7 +165,7 @@ internal class NativeManagedChannel(
grpc_slice_unref(methodNameSlice)

return NativeClientCall(
cq, rawCall, methodDescriptor, callJob
cq, rawCall, methodDescriptor, callOptions, callJob
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.grpc

/**
* Represents a compression algorithm for gRPC message encoding.
*
* Compression can be applied to gRPC messages to reduce bandwidth usage during transmission.
*
* ## Supported Algorithms
* - [None] (identity): No compression is applied.
* - [Gzip]: GZIP compression algorithm, widely supported and provides good compression ratios.
*
* This interface is not meant to be implemented by users.
*
* @property name The compression algorithm identifier sent in the `grpc-encoding` header.
*
* @see kotlinx.rpc.grpc.client.GrpcCallOptions.compression
* @see GrpcCompression.None
* @see GrpcCompression.Gzip
*/
@OptIn(ExperimentalSubclassOptIn::class)
@SubclassOptInRequired
public interface GrpcCompression {

/**
* The name of the compression algorithm as it appears in the `grpc-encoding` header.
*/
public val name: String

/**
* Represents no compression (identity encoding).
*
* This is the default compression setting. When used, messages are transmitted without
* any compression applied.
*/
public object None : GrpcCompression {
override val name: String = "identity"
}

/**
* Represents GZIP compression.
*
* GZIP is a widely supported compression algorithm that provides good compression ratios
* for most data types.
*
* **Note**: Ensure the server supports GZIP compression before using this option,
* as the call will fail if the server cannot handle the requested compression algorithm.
*/
public object Gzip : GrpcCompression {
override val name: String = "gzip"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.client.createInsecureClientCredentials
import kotlinx.rpc.grpc.client.internal.ClientCall
import kotlinx.rpc.grpc.client.internal.GrpcCallOptions
import kotlinx.rpc.grpc.client.GrpcCallOptions
import kotlinx.rpc.grpc.client.internal.ManagedChannel
import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder
import kotlinx.rpc.grpc.client.internal.buildChannel
Expand Down
Loading
Loading