Skip to content

Commit a91a285

Browse files
committed
WebRtc Client. Add Android implementation for datachannels.
1 parent d5f6531 commit a91a285

File tree

13 files changed

+315
-79
lines changed

13 files changed

+315
-79
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.client.webrtc
6+
7+
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.coroutines.CoroutineStart
9+
import kotlinx.coroutines.launch
10+
import org.webrtc.DataChannel
11+
import java.nio.ByteBuffer
12+
13+
/**
14+
* WebRtc data channel implementation for the Android platform.
15+
*/
16+
public class AndroidWebRtcDataChannel(
17+
internal val nativeChannel: DataChannel,
18+
private val channelInit: DataChannel.Init?,
19+
private val coroutineScope: CoroutineScope,
20+
options: WebRtcDataChannelOptions
21+
) : WebRtcDataChannel(options) {
22+
23+
override val id: Int
24+
get() = nativeChannel.id()
25+
26+
override val label: String
27+
get() = nativeChannel.label()
28+
29+
override val state: WebRtc.DataChannelState
30+
get() = nativeChannel.state().toKtor()
31+
32+
override val bufferedAmount: Long
33+
get() = nativeChannel.bufferedAmount()
34+
35+
override var bufferedAmountLowThreshold: Long = 0
36+
private set
37+
38+
override val maxPacketLifeTime: Int?
39+
get() = channelInit?.maxRetransmitTimeMs
40+
41+
override val maxRetransmits: Int?
42+
get() = channelInit?.maxRetransmits
43+
44+
override val negotiated: Boolean
45+
get() = channelInit?.negotiated ?: error("Can't retrieve negotiated state on Android")
46+
47+
override val ordered: Boolean
48+
get() = channelInit?.ordered ?: error("Can't retrieve ordered state on Android")
49+
50+
override val protocol: String
51+
get() = channelInit?.protocol ?: error("Protocol is not supported in WebRTC")
52+
53+
private fun checkStatus() {
54+
if (state == WebRtc.DataChannelState.CLOSING || state == WebRtc.DataChannelState.CLOSED) {
55+
error("Data channel is closed.")
56+
}
57+
}
58+
59+
override fun send(text: String) {
60+
checkStatus()
61+
val buffer = DataChannel.Buffer(Charsets.UTF_8.encode(text), false)
62+
nativeChannel.send(buffer)
63+
}
64+
65+
override fun send(bytes: ByteArray) {
66+
checkStatus()
67+
val buffer = DataChannel.Buffer(ByteBuffer.wrap(bytes), true)
68+
nativeChannel.send(buffer)
69+
}
70+
71+
override fun setBufferedAmountLowThreshold(threshold: Long) {
72+
bufferedAmountLowThreshold = threshold
73+
}
74+
75+
override fun close() {
76+
nativeChannel.close()
77+
}
78+
79+
internal fun setupEvents(eventsEmitter: WebRtcConnectionEventsEmitter) {
80+
nativeChannel.registerObserver(object : DataChannel.Observer {
81+
override fun onStateChange() {
82+
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
83+
val event = when (state) {
84+
WebRtc.DataChannelState.CONNECTING -> null
85+
WebRtc.DataChannelState.OPEN -> DataChannelEvent.Open(this@AndroidWebRtcDataChannel)
86+
WebRtc.DataChannelState.CLOSING -> DataChannelEvent.Closing(this@AndroidWebRtcDataChannel)
87+
WebRtc.DataChannelState.CLOSED -> {
88+
stopReceivingMessages()
89+
DataChannelEvent.Closed(this@AndroidWebRtcDataChannel)
90+
}
91+
}
92+
event?.let { eventsEmitter.emitDataChannelEvent(it) }
93+
}
94+
}
95+
96+
override fun onMessage(buffer: DataChannel.Buffer?) {
97+
// This coroutine should start immediately because the protocol relies on the messages order
98+
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
99+
if (buffer == null) {
100+
return@launch
101+
}
102+
val message = when (buffer.binary) {
103+
true -> {
104+
val data = ByteArray(buffer.data.remaining()).apply { buffer.data.get(this) }
105+
Message.Binary(data)
106+
}
107+
108+
false -> Message.Text(Charsets.UTF_8.decode(buffer.data).toString())
109+
}
110+
emitMessage(message)
111+
}
112+
}
113+
114+
override fun onBufferedAmountChange(previousAmount: Long) {
115+
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) {
116+
if (previousAmount > bufferedAmountLowThreshold && bufferedAmountLowThreshold > bufferedAmount) {
117+
return@launch
118+
}
119+
val event = DataChannelEvent.BufferedAmountLow(this@AndroidWebRtcDataChannel)
120+
eventsEmitter.emitDataChannelEvent(event)
121+
}
122+
}
123+
})
124+
}
125+
}
126+
127+
/**
128+
* Returns implementation of the data channel that is used under the hood. Use it with caution.
129+
*/
130+
public fun WebRtcDataChannel.getNative(): DataChannel {
131+
val channel = (this as? AndroidWebRtcDataChannel) ?: error("Wrong data channel implementation.")
132+
return channel.nativeChannel
133+
}

ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/Engine.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class AndroidWebRtcEngine(
5454
iceTransportsType = config.iceTransportPolicy.toNative()
5555
}
5656

57-
val coroutineContext = createConnectionContext(config.coroutinesContext)
57+
val coroutineContext = createConnectionContext(config.coroutineContext)
5858
return AndroidWebRtcPeerConnection(coroutineContext, config).initialize { observer ->
5959
getLocalFactory().createPeerConnection(rtcConfig, observer)
6060
}

ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/PeerConnection.kt

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package io.ktor.client.webrtc
66

77
import io.ktor.client.webrtc.media.*
8+
import kotlinx.coroutines.CoroutineStart
9+
import kotlinx.coroutines.launch
810
import org.webrtc.*
911
import org.webrtc.PeerConnection.Observer
1012
import kotlin.coroutines.CoroutineContext
@@ -25,7 +27,7 @@ public class AndroidWebRtcPeerConnection(
2527
if (this::peerConnection.isInitialized.not()) {
2628
cont.resume(emptyList())
2729
}
28-
peerConnection.getStats { cont.resume(it.toCommon()) }
30+
peerConnection.getStats { cont.resume(it.toKtor()) }
2931
}
3032

3133
// helper method to break a dependency cycle (PeerConnection -> PeerConnectionFactory -> Observer)
@@ -44,84 +46,120 @@ public class AndroidWebRtcPeerConnection(
4446
if (hasVideo) mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"))
4547
}
4648

49+
private inline fun runInConnectionScope(crossinline block: () -> Unit) {
50+
// Runs a `block` in the coroutine of the peer connection not to lose possible exceptions.
51+
// We are already running on the special thread, so extra dispatching is not required.
52+
// Moreover, dispatching the coroutine on another thread could break the internal `org.webrtc` logic.
53+
// For instance, it silently breaks registering a data channel observer.
54+
coroutineScope.launch(start = CoroutineStart.UNDISPATCHED) { block() }
55+
}
56+
4757
private fun createObserver() = object : Observer {
48-
override fun onIceCandidate(candidate: IceCandidate?) {
49-
if (candidate == null) return
50-
events.emitIceCandidate(candidate.toCommon())
58+
override fun onIceCandidate(candidate: IceCandidate?) = runInConnectionScope {
59+
if (candidate == null) return@runInConnectionScope
60+
events.emitIceCandidate(candidate.toKtor())
5161
}
5262

5363
override fun onIceCandidatesRemoved(candidates: Array<out IceCandidate>?) = Unit
5464

55-
override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array<out MediaStream>?) {
56-
if (receiver == null) return
65+
override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array<out MediaStream>?) = runInConnectionScope {
66+
if (receiver == null) return@runInConnectionScope
5767
receiver.track()?.let {
5868
events.emitAddTrack(AndroidMediaTrack.from(it))
5969
}
6070
}
6171

62-
override fun onRemoveTrack(receiver: RtpReceiver?) {
63-
if (receiver == null) return
72+
override fun onRemoveTrack(receiver: RtpReceiver?) = runInConnectionScope {
73+
if (receiver == null) return@runInConnectionScope
6474
receiver.track()?.let {
6575
events.emitRemoveTrack(AndroidMediaTrack.from(it))
6676
}
6777
}
6878

69-
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
70-
val commonState = newState.toCommon() ?: return
79+
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) = runInConnectionScope {
80+
val commonState = newState.toKtor() ?: return@runInConnectionScope
7181
events.emitIceConnectionStateChange(commonState)
7282
}
7383

74-
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
75-
val commonState = newState.toCommon() ?: return
84+
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) = runInConnectionScope {
85+
val commonState = newState.toKtor() ?: return@runInConnectionScope
7686
events.emitConnectionStateChange(commonState)
7787
}
7888

79-
override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
80-
val commonState = newState.toCommon() ?: return
89+
override fun onSignalingChange(newState: PeerConnection.SignalingState?) = runInConnectionScope {
90+
val commonState = newState.toKtor() ?: return@runInConnectionScope
8191
events.emitSignalingStateChange(commonState)
8292
}
8393

84-
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
85-
val commonState = newState.toCommon() ?: return
94+
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) = runInConnectionScope {
95+
val commonState = newState.toKtor() ?: return@runInConnectionScope
8696
events.emitIceGatheringStateChange(commonState)
8797
}
8898

8999
override fun onIceConnectionReceivingChange(receiving: Boolean) = Unit
90-
override fun onRenegotiationNeeded(): Unit = events.emitNegotiationNeeded()
100+
override fun onRenegotiationNeeded() = runInConnectionScope {
101+
events.emitNegotiationNeeded()
102+
}
91103

92104
// we omit streams and operate with tracks instead
93105
override fun onAddStream(p0: MediaStream?) = Unit
94106
override fun onRemoveStream(p0: MediaStream?) = Unit
95107

96-
// #TODO: implement data channels
97-
override fun onDataChannel(dataChannel: DataChannel?) = Unit
108+
override fun onDataChannel(dataChannel: DataChannel?) = runInConnectionScope {
109+
if (dataChannel == null) return@runInConnectionScope
110+
val channel = AndroidWebRtcDataChannel(
111+
nativeChannel = dataChannel,
112+
channelInit = null,
113+
coroutineScope = coroutineScope,
114+
options = WebRtcDataChannelOptions()
115+
)
116+
channel.setupEvents(events)
117+
}
98118
}
99119

100120
override val localDescription: WebRtc.SessionDescription?
101-
get() = peerConnection.localDescription?.toCommon()
121+
get() = peerConnection.localDescription?.toKtor()
102122

103123
override val remoteDescription: WebRtc.SessionDescription?
104-
get() = peerConnection.remoteDescription?.toCommon()
124+
get() = peerConnection.remoteDescription?.toKtor()
105125

106126
override suspend fun createOffer(): WebRtc.SessionDescription {
107127
val offer = suspendCoroutine { cont ->
108128
peerConnection.createOffer(cont.resumeAfterSdpCreate(), offerConstraints())
109129
}
110-
return offer.toCommon()
130+
return offer.toKtor()
111131
}
112132

113133
override suspend fun createAnswer(): WebRtc.SessionDescription {
114134
val answer = suspendCoroutine { cont ->
115135
peerConnection.createAnswer(cont.resumeAfterSdpCreate(), offerConstraints())
116136
}
117-
return answer.toCommon()
137+
return answer.toKtor()
118138
}
119139

120140
override suspend fun createDataChannel(
121141
label: String,
122142
options: WebRtcDataChannelOptions.() -> Unit
123143
): WebRtcDataChannel {
124-
TODO("Not yet implemented")
144+
val options = WebRtcDataChannelOptions().apply(options)
145+
val channelInit = DataChannel.Init().apply {
146+
if (options.id != null) {
147+
id = options.id!!
148+
}
149+
if (options.maxRetransmits != null) {
150+
maxRetransmits = options.maxRetransmits!!
151+
}
152+
if (options.maxPacketLifeTime != null) {
153+
maxRetransmitTimeMs = options.maxPacketLifeTime?.inWholeMilliseconds?.toInt()!!
154+
}
155+
ordered = options.ordered
156+
protocol = options.protocol
157+
negotiated = options.negotiated
158+
}
159+
val nativeChannel = peerConnection.createDataChannel(label, channelInit)
160+
return AndroidWebRtcDataChannel(nativeChannel, channelInit, coroutineScope, options).apply {
161+
setupEvents(events)
162+
}
125163
}
126164

127165
override suspend fun setLocalDescription(description: WebRtc.SessionDescription) {

ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/Utils.kt

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44

55
package io.ktor.client.webrtc
66

7-
import org.webrtc.IceCandidate
8-
import org.webrtc.PeerConnection
9-
import org.webrtc.RTCStatsReport
10-
import org.webrtc.SdpObserver
11-
import org.webrtc.SessionDescription
7+
import org.webrtc.*
128
import kotlin.coroutines.Continuation
139
import kotlin.coroutines.resume
1410
import kotlin.coroutines.resumeWithException
@@ -41,7 +37,7 @@ internal fun WebRtc.RtcpMuxPolicy.toNative(): PeerConnection.RtcpMuxPolicy = whe
4137
WebRtc.RtcpMuxPolicy.REQUIRE -> PeerConnection.RtcpMuxPolicy.REQUIRE
4238
}
4339

44-
internal fun PeerConnection.IceConnectionState?.toCommon(): WebRtc.IceConnectionState? = when (this) {
40+
internal fun PeerConnection.IceConnectionState?.toKtor(): WebRtc.IceConnectionState? = when (this) {
4541
PeerConnection.IceConnectionState.NEW -> WebRtc.IceConnectionState.NEW
4642
PeerConnection.IceConnectionState.FAILED -> WebRtc.IceConnectionState.FAILED
4743
PeerConnection.IceConnectionState.CLOSED -> WebRtc.IceConnectionState.CLOSED
@@ -52,7 +48,7 @@ internal fun PeerConnection.IceConnectionState?.toCommon(): WebRtc.IceConnection
5248
else -> null
5349
}
5450

55-
internal fun PeerConnection.PeerConnectionState?.toCommon(): WebRtc.ConnectionState? = when (this) {
51+
internal fun PeerConnection.PeerConnectionState?.toKtor(): WebRtc.ConnectionState? = when (this) {
5652
PeerConnection.PeerConnectionState.NEW -> WebRtc.ConnectionState.NEW
5753
PeerConnection.PeerConnectionState.CLOSED -> WebRtc.ConnectionState.CLOSED
5854
PeerConnection.PeerConnectionState.CONNECTED -> WebRtc.ConnectionState.CONNECTED
@@ -62,7 +58,7 @@ internal fun PeerConnection.PeerConnectionState?.toCommon(): WebRtc.ConnectionSt
6258
else -> null
6359
}
6460

65-
internal fun PeerConnection.SignalingState?.toCommon(): WebRtc.SignalingState? = when (this) {
61+
internal fun PeerConnection.SignalingState?.toKtor(): WebRtc.SignalingState? = when (this) {
6662
PeerConnection.SignalingState.STABLE -> WebRtc.SignalingState.STABLE
6763
PeerConnection.SignalingState.CLOSED -> WebRtc.SignalingState.CLOSED
6864
PeerConnection.SignalingState.HAVE_LOCAL_OFFER -> WebRtc.SignalingState.HAVE_LOCAL_OFFER
@@ -72,14 +68,21 @@ internal fun PeerConnection.SignalingState?.toCommon(): WebRtc.SignalingState? =
7268
else -> null
7369
}
7470

75-
internal fun PeerConnection.IceGatheringState?.toCommon(): WebRtc.IceGatheringState? = when (this) {
71+
internal fun PeerConnection.IceGatheringState?.toKtor(): WebRtc.IceGatheringState? = when (this) {
7672
PeerConnection.IceGatheringState.NEW -> WebRtc.IceGatheringState.NEW
7773
PeerConnection.IceGatheringState.COMPLETE -> WebRtc.IceGatheringState.COMPLETE
7874
PeerConnection.IceGatheringState.GATHERING -> WebRtc.IceGatheringState.GATHERING
7975
else -> null
8076
}
8177

82-
internal fun SessionDescription.toCommon(): WebRtc.SessionDescription {
78+
internal fun DataChannel.State.toKtor(): WebRtc.DataChannelState = when (this) {
79+
DataChannel.State.CONNECTING -> WebRtc.DataChannelState.CONNECTING
80+
DataChannel.State.OPEN -> WebRtc.DataChannelState.OPEN
81+
DataChannel.State.CLOSING -> WebRtc.DataChannelState.CLOSING
82+
DataChannel.State.CLOSED -> WebRtc.DataChannelState.CLOSED
83+
}
84+
85+
internal fun SessionDescription.toKtor(): WebRtc.SessionDescription {
8386
return WebRtc.SessionDescription(
8487
when (requireNotNull(type)) {
8588
SessionDescription.Type.OFFER -> WebRtc.SessionDescriptionType.OFFER
@@ -91,7 +94,7 @@ internal fun SessionDescription.toCommon(): WebRtc.SessionDescription {
9194
)
9295
}
9396

94-
internal fun RTCStatsReport.toCommon(): List<WebRtc.Stats> = statsMap.values.map {
97+
internal fun RTCStatsReport.toKtor(): List<WebRtc.Stats> = statsMap.values.map {
9598
WebRtc.Stats(
9699
it.id,
97100
it.type,
@@ -100,7 +103,7 @@ internal fun RTCStatsReport.toCommon(): List<WebRtc.Stats> = statsMap.values.map
100103
)
101104
}
102105

103-
internal fun IceCandidate.toCommon(): WebRtc.IceCandidate = WebRtc.IceCandidate(
106+
internal fun IceCandidate.toKtor(): WebRtc.IceCandidate = WebRtc.IceCandidate(
104107
candidate = sdp,
105108
sdpMid = sdpMid,
106109
sdpMLineIndex = sdpMLineIndex

0 commit comments

Comments
 (0)