Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.webrtc

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import org.webrtc.DataChannel
import java.nio.ByteBuffer

/**
* WebRtc data channel implementation for the Android platform.
*/
public class AndroidWebRtcDataChannel(
internal val nativeChannel: DataChannel,
private val channelInit: DataChannel.Init?,
private val coroutineScope: CoroutineScope,
receiveOptions: DataChannelReceiveOptions
) : WebRtcDataChannel(receiveOptions) {

override val id: Int
get() = nativeChannel.id()

override val label: String
get() = nativeChannel.label()

override val state: WebRtc.DataChannel.State
get() = nativeChannel.state().toKtor()

override val bufferedAmount: Long
get() = nativeChannel.bufferedAmount()

override var bufferedAmountLowThreshold: Long = 0
private set

override val maxPacketLifeTime: Int?
get() = channelInit?.maxRetransmitTimeMs

override val maxRetransmits: Int?
get() = channelInit?.maxRetransmits

override val negotiated: Boolean
get() = channelInit?.negotiated ?: error("Can't retrieve negotiated state on Android")

override val ordered: Boolean
get() = channelInit?.ordered ?: error("Can't retrieve ordered state on Android")

override val protocol: String
get() = channelInit?.protocol ?: error("Protocol is not supported in WebRTC")

private fun checkStatus() {
if (!state.canSend()) {
error("Data channel is closed.")
}
}

override fun send(text: String) {
checkStatus()
val buffer = DataChannel.Buffer(Charsets.UTF_8.encode(text), false)
nativeChannel.send(buffer)
}

override fun send(bytes: ByteArray) {
checkStatus()
val buffer = DataChannel.Buffer(ByteBuffer.wrap(bytes), true)
nativeChannel.send(buffer)
}

override fun setBufferedAmountLowThreshold(threshold: Long) {
bufferedAmountLowThreshold = threshold
}

override fun close() {
nativeChannel.close()
}

internal fun setupEvents(eventsEmitter: WebRtcConnectionEventsEmitter) {
nativeChannel.registerObserver(object : DataChannel.Observer {
override fun onStateChange() {
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
val event = when (state) {
WebRtc.DataChannel.State.CONNECTING -> null
WebRtc.DataChannel.State.OPEN -> DataChannelEvent.Open(this@AndroidWebRtcDataChannel)
WebRtc.DataChannel.State.CLOSING -> DataChannelEvent.Closing(this@AndroidWebRtcDataChannel)
WebRtc.DataChannel.State.CLOSED -> {
stopReceivingMessages()
DataChannelEvent.Closed(this@AndroidWebRtcDataChannel)
}
}
event?.let { eventsEmitter.emitDataChannelEvent(it) }
}
}

override fun onMessage(buffer: DataChannel.Buffer?) {
// This coroutine should start immediately because the protocol relies on the message order
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
if (buffer == null) {
return@launch
}
val message = when (buffer.binary) {
true -> {
val data = ByteArray(buffer.data.remaining()).apply { buffer.data.get(this) }
WebRtc.DataChannel.Message.Binary(data)
}

false -> {
val data = Charsets.UTF_8.decode(buffer.data).toString()
WebRtc.DataChannel.Message.Text(data)
}
}
emitMessage(message)
}
}

override fun onBufferedAmountChange(previousAmount: Long) {
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
if (previousAmount > bufferedAmountLowThreshold && bufferedAmountLowThreshold > bufferedAmount) {
return@launch
}
val event = DataChannelEvent.BufferedAmountLow(this@AndroidWebRtcDataChannel)
eventsEmitter.emitDataChannelEvent(event)
}
}
})
}
}

/**
* Returns implementation of the data channel that is used under the hood. Use it with caution.
*/
public fun WebRtcDataChannel.getNative(): DataChannel {
val channel = (this as? AndroidWebRtcDataChannel) ?: error("Wrong data channel implementation.")
return channel.nativeChannel
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public class AndroidWebRtcEngine(
.createIceServer()
}

override suspend fun createPeerConnection(connectionConfig: WebRtcConnectionConfig?): WebRtcPeerConnection {
val config = connectionConfig ?: WebRtcConnectionConfig().apply(config.defaultConnectionConfig)
override suspend fun createPeerConnection(config: WebRtcConnectionConfig): WebRtcPeerConnection {
val iceServers = config.iceServers.map { it.toNative() }
val rtcConfig = RTCConfiguration(iceServers).apply {
bundlePolicy = config.bundlePolicy.toNative()
Expand All @@ -55,6 +54,7 @@ public class AndroidWebRtcEngine(
iceTransportsType = config.iceTransportPolicy.toNative()
}

val coroutineContext = createConnectionContext(config.coroutineContext)
return AndroidWebRtcPeerConnection(coroutineContext, config).initialize { observer ->
getLocalFactory().createPeerConnection(rtcConfig, observer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.ktor.client.webrtc

import io.ktor.client.webrtc.media.*
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import org.webrtc.*
import org.webrtc.PeerConnection.Observer
Expand All @@ -26,7 +27,7 @@ public class AndroidWebRtcPeerConnection(
if (this::peerConnection.isInitialized.not()) {
cont.resume(emptyList())
}
peerConnection.getStats { cont.resume(it.toCommon()) }
peerConnection.getStats { cont.resume(it.toKtor()) }
}

// helper method to break a dependency cycle (PeerConnection -> PeerConnectionFactory -> Observer)
Expand All @@ -45,81 +46,121 @@ public class AndroidWebRtcPeerConnection(
if (hasVideo) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
}

private inline fun runInConnectionScope(crossinline block: () -> Unit) {
// Runs a `block` in the coroutine of the peer connection not to lose possible exceptions.
// We are already running on the special thread, so extra dispatching is not required.
// Moreover, dispatching the coroutine on another thread could break the internal `org.webrtc` logic.
// For instance, it silently breaks registering a data channel observer.
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) { block() }
}

private fun createObserver() = object : Observer {
override fun onIceCandidate(candidate: IceCandidate?) {
if (candidate == null) return
events.emitIceCandidate(candidate.toCommon())
override fun onIceCandidate(candidate: IceCandidate?) = runInConnectionScope {
if (candidate == null) return@runInConnectionScope
events.emitIceCandidate(candidate.toKtor())
}

override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) = Unit

override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array<out MediaStream>?) {
if (receiver == null) return
launch {
receiver.track()?.let {
events.emitAddTrack(AndroidMediaTrack.from(it))
}
override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array<out MediaStream>?) = runInConnectionScope {
if (receiver == null) return@runInConnectionScope
receiver.track()?.let {
events.emitAddTrack(AndroidMediaTrack.from(it))
}
}

override fun onRemoveTrack(receiver: RtpReceiver?) {
if (receiver == null) return
launch {
receiver.track()?.let {
events.emitRemoveTrack(AndroidMediaTrack.from(it))
}
override fun onRemoveTrack(receiver: RtpReceiver?) = runInConnectionScope {
if (receiver == null) return@runInConnectionScope
receiver.track()?.let {
events.emitRemoveTrack(AndroidMediaTrack.from(it))
}
}

override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
val commonState = newState.toCommon() ?: return
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) = runInConnectionScope {
val commonState = newState.toKtor() ?: return@runInConnectionScope
events.emitIceConnectionStateChange(commonState)
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
val commonState = newState.toCommon() ?: return
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) = runInConnectionScope {
val commonState = newState.toKtor() ?: return@runInConnectionScope
events.emitConnectionStateChange(commonState)
}

override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
val commonState = newState.toCommon() ?: return
override fun onSignalingChange(newState: PeerConnection.SignalingState?) = runInConnectionScope {
val commonState = newState.toKtor() ?: return@runInConnectionScope
events.emitSignalingStateChange(commonState)
}

override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
val commonState = newState.toCommon() ?: return
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) = runInConnectionScope {
val commonState = newState.toKtor() ?: return@runInConnectionScope
events.emitIceGatheringStateChange(commonState)
}

override fun onIceConnectionReceivingChange(receiving: Boolean) = Unit
override fun onRenegotiationNeeded(): Unit = events.emitNegotiationNeeded()
override fun onRenegotiationNeeded() = runInConnectionScope {
events.emitNegotiationNeeded()
}

// we omit streams and operate with tracks instead
override fun onAddStream(p0: MediaStream?) = Unit
override fun onRemoveStream(p0: MediaStream?) = Unit

// #TODO: implement data channels
override fun onDataChannel(dataChannel: DataChannel?) = Unit
override fun onDataChannel(dataChannel: DataChannel?) = runInConnectionScope {
if (dataChannel == null) return@runInConnectionScope
val channel = AndroidWebRtcDataChannel(
nativeChannel = dataChannel,
channelInit = null,
coroutineScope = coroutineScope,
receiveOptions = DataChannelReceiveOptions()
)
channel.setupEvents(events)
}
}

override val localDescription: WebRtc.SessionDescription?
get() = peerConnection.localDescription?.toCommon()
get() = peerConnection.localDescription?.toKtor()

override val remoteDescription: WebRtc.SessionDescription?
get() = peerConnection.remoteDescription?.toCommon()
get() = peerConnection.remoteDescription?.toKtor()

override suspend fun createOffer(): WebRtc.SessionDescription {
val offer = suspendCoroutine { cont ->
peerConnection.createOffer(cont.resumeAfterSdpCreate(), offerConstraints())
}
return offer.toCommon()
return offer.toKtor()
}

override suspend fun createAnswer(): WebRtc.SessionDescription {
val answer = suspendCoroutine { cont ->
peerConnection.createAnswer(cont.resumeAfterSdpCreate(), offerConstraints())
}
return answer.toCommon()
return answer.toKtor()
}

override suspend fun createDataChannel(
label: String,
options: WebRtcDataChannelOptions.() -> Unit
): WebRtcDataChannel {
val options = WebRtcDataChannelOptions().apply(options)
val channelInit = DataChannel.Init().apply {
if (options.id != null) {
id = options.id!!
}
if (options.maxRetransmits != null) {
maxRetransmits = options.maxRetransmits!!
}
if (options.maxPacketLifeTime != null) {
maxRetransmitTimeMs = options.maxPacketLifeTime?.inWholeMilliseconds?.toInt()!!
}
ordered = options.ordered
protocol = options.protocol
negotiated = options.negotiated
}
val nativeChannel = peerConnection.createDataChannel(label, channelInit)
val receiveOptions = DataChannelReceiveOptions().apply(options.receiveOptions)
return AndroidWebRtcDataChannel(nativeChannel, channelInit, coroutineScope, receiveOptions).apply {
setupEvents(events)
}
}

override suspend fun setLocalDescription(description: WebRtc.SessionDescription) {
Expand Down
Loading