Description
In documentation, there is no example shown of how to send a request from server -> client over an RPC call, only client -> server.
I am not sure if this is just a missing feature, or just missing documentation, but I find this a critical feature for my application, and would like to see the "right way to do it".
I have implemented an interval-polled subscribe system which I use to accomplish this, but if there's a "right way" to do this, that would be preferred.
Something maybe like this:
// in API module
@ClientRpc
interface ClientEventService : ClientRemoteService {
suspend fun receiveEvent(event: Event)
}
// in client module
class ClientEventService : ClientEventService {
override suspend fun receiveEvent(event: Event) {
when (event) {
is MessageEvent -> {
println(event.message ?: return)
}
else -> println("Unhandled event: $event")
}
}
}
registerService<ClientEventService> { ctx -> ClientServiceImpl(ctx) }
// in server module code under an RPC request something like:
val clientEventService: ClientEventService = request.withService()
clientEventService.receiveEvent(MessageEvent("Hello world!"))
Here is a stripped-down version of my implementation to achieve something similar, although with the drawback that it only polls at a set interval and thus has polling overhead + delay:
API module (client and server both depend on this code):
@Serializable
sealed interface Event
@Serializable
data class MessageEvent(
val message: String?
) : Event
@Rpc
interface EventService : RemoteService {
suspend fun subscribe(uuid: Long)
suspend fun pollEvents(uuid: Long): List<Event>?
}
Server module:
class EventServiceImpl(
override val coroutineContext: CoroutineContext
) : EventService {
private val uuidToEvents: MutableMap<Long, BlockingQueue<Event>> =
ConcurrentHashMap()
private fun emitEvent(
uuid: Long,
event: Event
): Boolean {
val events = uuidToEvents[uuid] ?: return false
return events.add(event)
}
override suspend fun subscribe(uuid: Long) {
check(!uuidToEvents.containsKey(uuid)) {
"UUID $uuid is already subscribed"
}
uuidToEvents[uuid] = ArrayBlockingQueue(256, true)
emitEvent(uuid, MessageEvent("Successfully subscribed UUID $uuid"))
}
override suspend fun pollEvents(uuid: Long): List<Event>? {
val events = uuidToEvents[uuid] ?: return null
val polledEvents: MutableList<Event> = mutableListOf()
events.drainTo(polledEvents)
return polledEvents
}
}
Then a client can utilize it like this:
const val OUR_UUID = 1234L
const val POLL_INTERVAL_NANOS = 100L * 1000 * 1000 // 100 milliseconds
val eventService: EventService = rpcClient.withService()
eventService.subscribe(OUR_UUID)
fun pollEvents(uuid: Long) {
val events: List<Event> = eventService.pollEvents(uuid) ?: return
for (event in events) {
when (event) {
is MessageEvent -> {
println(event.message ?: return)
}
else -> println("Unhandled event: $event")
}
}
}
while (true) {
val elapsedNanos = measureNanoTime {
pollEvents(OUR_UUID)
}
val delayNanos = POLL_INTERVAL_NANOS - elapsedNanos
if (delayNanos > 0) {
val delayMillis = delayNanos.milliseconds
if (delayMillis > 0) {
delay(delayMillis)
}
}
}